blob: ed919e0465a90d27aaa62e1a2d1f909509ce6a07 [file] [log] [blame]
<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>Apache Hadoop Input/Output Format IO</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/documentation/io/built-in/hadoop/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/io/built-in/hadoop.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/io/built-in/hadoop.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Documentation</span></li><li><a href=/documentation>Using the Documentation</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Concepts</span><ul class=section-nav-list><li><a href=/documentation/basics/>Basics of the Beam model</a></li><li><a href=/documentation/runtime/model/>How Beam executes a pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Beam programming guide</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/>Overview</a></li><li><a href=/documentation/programming-guide/#creating-a-pipeline>Pipelines</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>PCollections</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pcollections>Creating a PCollection</a></li><li><a href=/documentation/programming-guide/#pcollection-characteristics>PCollection characteristics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#applying-transforms>Applying transforms</a></li><li><span class=section-nav-list-title>Core Beam transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pardo>ParDo</a></li><li><a href=/documentation/programming-guide/#groupbykey>GroupByKey</a></li><li><a href=/documentation/programming-guide/#cogroupbykey>CoGroupByKey</a></li><li><a href=/documentation/programming-guide/#combine>Combine</a></li><li><a href=/documentation/programming-guide/#flatten>Flatten</a></li><li><a href=/documentation/programming-guide/#partition>Partition</a></li></ul></li><li><a href=/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms>Requirements for user code</a></li><li><a href=/documentation/programming-guide/#side-inputs>Side inputs</a></li><li><a href=/documentation/programming-guide/#additional-outputs>Additional outputs</a></li><li><a href=/documentation/programming-guide/#composite-transforms>Composite transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline I/O</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pipeline-io>Using I/O transforms</a></li><li><a href=/documentation/io/connectors/>I/O connectors</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>I/O connector guides</span><ul class=section-nav-list><li><a href=/documentation/io/built-in/parquet/>Apache Parquet I/O connector</a></li><li><a href=/documentation/io/built-in/hadoop/>Hadoop Input/Output Format IO</a></li><li><a href=/documentation/io/built-in/hcatalog/>HCatalog IO</a></li><li><a href=/documentation/io/built-in/google-bigquery/>Google BigQuery I/O connector</a></li><li><a href=/documentation/io/built-in/snowflake/>Snowflake I/O connector</a></li><li><a href=/documentation/io/built-in/cdap/>CDAP I/O connector</a></li><li><a href=/documentation/io/built-in/sparkreceiver/>Spark Receiver I/O connector</a></li><li><a href=/documentation/io/built-in/singlestore/>SingleStoreDB I/O connector</a></li><li><a href=/documentation/io/built-in/webapis/>Web APIs I/O connector</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Developing new I/O connectors</span><ul class=section-nav-list><li><a href=/documentation/io/developing-io-overview/>Overview: Developing connectors</a></li><li><a href=/documentation/io/developing-io-java/>Developing connectors (Java)</a></li><li><a href=/documentation/io/developing-io-python/>Developing connectors (Python)</a></li><li><a href=/documentation/io/io-standards/>I/O Standards</a></li></ul></li><li><a href=/documentation/io/testing/>Testing I/O transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Schemas</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#what-is-a-schema>What is a schema</a></li><li><a href=/documentation/programming-guide/#schemas-for-pl-types>Schemas for programming language types</a></li><li><a href=/documentation/programming-guide/#schema-definition>Schema definition</a></li><li><a href=/documentation/programming-guide/#logical-types>Logical types</a></li><li><a href=/documentation/programming-guide/#creating-schemas>Creating schemas</a></li><li><a href=/documentation/programming-guide/#using-schemas>Using schemas</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data encoding and type safety</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#data-encoding-and-type-safety>Data encoding basics</a></li><li><a href=/documentation/programming-guide/#specifying-coders>Specifying coders</a></li><li><a href=/documentation/programming-guide/#default-coders-and-the-coderregistry>Default coders and the CoderRegistry</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Windowing</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#windowing>Windowing basics</a></li><li><a href=/documentation/programming-guide/#provided-windowing-functions>Provided windowing functions</a></li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function>Setting your PCollection’s windowing function</a></li><li><a href=/documentation/programming-guide/#watermarks-and-late-data>Watermarks and late data</a></li><li><a href=/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements>Adding timestamps to a PCollection’s elements</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Triggers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#triggers>Trigger basics</a></li><li><a href=/documentation/programming-guide/#event-time-triggers>Event time triggers and the default trigger</a></li><li><a href=/documentation/programming-guide/#processing-time-triggers>Processing time triggers</a></li><li><a href=/documentation/programming-guide/#data-driven-triggers>Data-driven triggers</a></li><li><a href=/documentation/programming-guide/#setting-a-trigger>Setting a trigger</a></li><li><a href=/documentation/programming-guide/#composite-triggers>Composite triggers</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Metrics</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#metrics>Metrics basics</a></li><li><a href=/documentation/programming-guide/#types-of-metrics>Types of metrics</a></li><li><a href=/documentation/programming-guide/#querying-metrics>Querying metrics</a></li><li><a href=/documentation/programming-guide/#using-metrics>Using metrics in pipeline</a></li><li><a href=/documentation/programming-guide/#export-metrics>Export metrics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>State and Timers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#types-of-state>Types of state</a></li><li><a href=/documentation/programming-guide/#deferred-state-reads>Deferred state reads</a></li><li><a href=/documentation/programming-guide/#timers>Timers</a></li><li><a href=/documentation/programming-guide/#garbage-collecting-state>Garbage collecting state</a></li><li><a href=/documentation/programming-guide/#state-timers-examples>State and timers examples</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Splittable DoFns</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#sdf-basics>Basics</a></li><li><a href=/documentation/programming-guide/#sizing-and-progress>Sizing and progress</a></li><li><a href=/documentation/programming-guide/#user-initiated-checkpoint>User-initiated checkpoint</a></li><li><a href=/documentation/programming-guide/#runner-initiated-split>Runner initiated split</a></li><li><a href=/documentation/programming-guide/#watermark-estimation>Watermark estimation</a></li><li><a href=/documentation/programming-guide/#truncating-during-drain>Truncating during drain</a></li><li><a href=/documentation/programming-guide/#bundle-finalization>Bundle finalization</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Multi-language Pipelines</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#create-x-lang-transforms>Creating cross-language transforms</a></li><li><a href=/documentation/programming-guide/#use-x-lang-transforms>Using cross-language transforms</a></li><li><a href=/documentation/programming-guide/#x-lang-transform-runner-support>Runner Support</a></li></ul></li><li><a href=/documentation/programming-guide/#batched-dofns>Batched DoFns</a></li><li><a href=/documentation/programming-guide/#transform-service>Transform service</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline development lifecycle</span><ul class=section-nav-list><li><a href=/documentation/pipelines/design-your-pipeline/>Design Your Pipeline</a></li><li><a href=/documentation/pipelines/create-your-pipeline/>Create Your Pipeline</a></li><li><a href=/documentation/pipelines/test-your-pipeline/>Test Your Pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Common pipeline patterns</span><ul class=section-nav-list><li><a href=/documentation/patterns/overview/>Overview</a></li><li><a href=/documentation/patterns/file-processing/>File processing</a></li><li><a href=/documentation/patterns/side-inputs/>Side inputs</a></li><li><a href=/documentation/patterns/pipeline-options/>Pipeline options</a></li><li><a href=/documentation/patterns/custom-io/>Custom I/O</a></li><li><a href=/documentation/patterns/custom-windows/>Custom windows</a></li><li><a href=/documentation/patterns/bigqueryio/>BigQueryIO</a></li><li><a href=/documentation/patterns/ai-platform/>AI Platform</a></li><li><a href=/documentation/patterns/schema/>Schema</a></li><li><a href=/documentation/patterns/bqml/>BigQuery ML</a></li><li><a href=/documentation/patterns/grouping-elements-for-efficient-external-service-calls/>Grouping elements for efficient external service calls</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>AI/ML pipelines</span><ul class=section-nav-list><li><a href=/documentation/ml/overview/>Get started with AI/ML</a></li><li><a href=/documentation/ml/about-ml/>About Beam ML</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Prediction and inference</span><ul class=section-nav-list><li><a href=/documentation/ml/inference-overview/>Overview</a></li><li><a href=/documentation/ml/multi-model-pipelines/>Build a pipeline with multiple models</a></li><li><a href=/documentation/ml/tensorrt-runinference>Build a custom model handler with TensorRT</a></li><li><a href=/documentation/ml/large-language-modeling>Use LLM inference</a></li><li><a href=/documentation/ml/multi-language-inference/>Build a multi-language inference pipeline</a></li><li><a href=/documentation/ml/side-input-updates/>Update your model in production</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data processing</span><ul class=section-nav-list><li><a href=/documentation/ml/preprocess-data/>Preprocess data</a></li><li><a href=/documentation/ml/data-processing/>Explore your data</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Workflow orchestration</span><ul class=section-nav-list><li><a href=/documentation/ml/orchestration/>Use ML-OPS workflow orchestrators</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Model training</span><ul class=section-nav-list><li><a href=/documentation/ml/per-entity-training>Per-entity training</a></li><li><a href=/documentation/ml/online-clustering/>Online clustering</a></li><li><a href=/documentation/ml/model-evaluation/>ML model evaluation</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Use cases</span><ul class=section-nav-list><li><a href=/documentation/ml/anomaly-detection/>Build an anomaly detection pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Reference</span><ul class=section-nav-list><li><a href=/documentation/ml/runinference-metrics/>RunInference metrics</a></li><li><a href=/documentation/ml/model-evaluation/>Model validation</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Runtime systems</span><ul class=section-nav-list><li><a href=/documentation/runtime/environments/>Container environments</a></li><li><a href=/documentation/runtime/resource-hints/>Resource hints</a></li><li><a href=/documentation/runtime/sdk-harness-config/>SDK Harness Configuration</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transform catalog</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Python</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Enrichment</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/enrichment/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-bigtable/>Bigtable example</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-vertexai/>Vertex AI Feature Store examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/python/elementwise/flatmap/>FlatMap</a></li><li><a href=/documentation/transforms/python/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/python/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/python/elementwise/map/>Map</a></li><li><a href=/documentation/transforms/python/elementwise/mltransform/>MLTransform</a></li><li><a href=/documentation/transforms/python/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/python/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/python/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/python/elementwise/reify/>Reify</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>RunInference</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/runinference/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-pytorch/>PyTorch examples</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-sklearn/>Sklearn examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/python/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/python/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/python/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/python/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/combineglobally/>CombineGlobally</a></li><li><a href=/documentation/transforms/python/aggregation/combineperkey/>CombinePerKey</a></li><li><a href=/documentation/transforms/python/aggregation/combinevalues/>CombineValues</a></li><li><a href=/documentation/transforms/python/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/python/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/python/aggregation/groupby/>GroupBy</a></li><li><a href=/documentation/transforms/python/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/python/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/python/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/python/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/python/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/python/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/python/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/python/aggregation/top/>Top</a></li><li><a href=/documentation/transforms/python/aggregation/tolist/>ToList</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/other/create/>Create</a></li><li><a href=/documentation/transforms/python/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/python/other/reshuffle/>Reshuffle</a></li><li><a href=/documentation/transforms/python/other/windowinto/>WindowInto</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Java</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/java/elementwise/flatmapelements/>FlatMapElements</a></li><li><a href=/documentation/transforms/java/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/java/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/java/elementwise/mapelements/>MapElements</a></li><li><a href=/documentation/transforms/java/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/java/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/java/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/java/elementwise/reify/>Reify</a></li><li><a href=/documentation/transforms/java/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/java/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/java/elementwise/withkeys/>WithKeys</a></li><li><a href=/documentation/transforms/java/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/java/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/java/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/combine/>Combine</a></li><li><a href=/documentation/transforms/java/aggregation/combinewithcontext/>CombineWithContext</a></li><li><a href=/documentation/transforms/java/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/java/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/java/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/java/aggregation/hllcount/>HllCount</a></li><li><a href=/documentation/transforms/java/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/java/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/java/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/java/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/java/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/java/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/java/aggregation/top/>Top</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/other/create/>Create</a></li><li><a href=/documentation/transforms/java/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/java/other/passert/>PAssert</a></li><li><a href=/documentation/transforms/java/other/view/>View</a></li><li><a href=/documentation/transforms/java/other/window/>Window</a></li></ul></li></ul></li></ul></li><li><a href=/documentation/glossary/>Glossary</a></li><li><a href=https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam>Beam Wiki <img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><ul><li><a href=#reading-using-hadoopformatio>Reading using HadoopFormatIO</a><ul><li><a href=#read-data-only-with-hadoop-configuration>Read data only with Hadoop configuration.</a></li><li><a href=#read-data-with-configuration-and-key-translation>Read data with configuration and key translation</a></li><li><a href=#read-data-with-configuration-and-value-translation>Read data with configuration and value translation</a></li><li><a href=#read-data-with-configuration-value-translation-and-key-translation>Read data with configuration, value translation and key translation</a></li></ul></li></ul></li></ul><ul><li><ul><li><a href=#cassandra---cqlinputformat>Cassandra - CqlInputFormat</a></li><li><a href=#elasticsearch---esinputformat>Elasticsearch - EsInputFormat</a></li><li><a href=#hcatalog---hcatinputformat>HCatalog - HCatInputFormat</a></li><li><a href=#amazon-dynamodb---dynamodbinputformat>Amazon DynamoDB - DynamoDBInputFormat</a></li><li><a href=#apache-hbase---tablesnapshotinputformat>Apache HBase - TableSnapshotInputFormat</a></li><li><a href=#writing-using-hadoopformatio>Writing using HadoopFormatIO</a><ul><li><a href=#batch-writing>Batch writing</a></li><li><a href=#stream-writing>Stream writing</a></li></ul></li></ul></li></ul></nav></nav><div class="body__contained body__section-nav arrow-list arrow-list--no-mt"><h1 id=hadoop-inputoutput-format-io>Hadoop Input/Output Format IO</h1><blockquote><p><strong>IMPORTANT!</strong> Previous implementation of Hadoop Input Format IO, called <code>HadoopInputFormatIO</code>, is deprecated starting from <em>Apache Beam 2.10</em>. Please, use current <code>HadoopFormatIO</code> which supports both <code>InputFormat</code> and <code>OutputFormat</code>.</p></blockquote><p>A <code>HadoopFormatIO</code> is a transform for reading data from any source or writing data to any sink that implements Hadoop&rsquo;s <code>InputFormat</code> or <code>OutputFormat</code> accordingly. For example, Cassandra, Elasticsearch, HBase, Redis, Postgres, etc.</p><p><code>HadoopFormatIO</code> allows you to connect to many data sources/sinks that do not yet have a Beam IO transform. However, <code>HadoopFormatIO</code> has to make several performance trade-offs in connecting to <code>InputFormat</code> or <code>OutputFormat</code>. So, if there is another Beam IO transform for connecting specifically to your data source/sink of choice, we recommend you use that one.</p><h3 id=reading-using-hadoopformatio>Reading using HadoopFormatIO</h3><p>You will need to pass a Hadoop <code>Configuration</code> with parameters specifying how the read will occur. Many properties of the <code>Configuration</code> are optional and some are required for certain <code>InputFormat</code> classes, but the following properties must be set for all <code>InputFormat</code> classes:</p><ul><li><code>mapreduce.job.inputformat.class</code> - The <code>InputFormat</code> class used to connect to your data source of choice.</li><li><code>key.class</code> - The <code>Key</code> class returned by the <code>InputFormat</code> in <code>mapreduce.job.inputformat.class</code>.</li><li><code>value.class</code> - The <code>Value</code> class returned by the <code>InputFormat</code> in <code>mapreduce.job.inputformat.class</code>.</li></ul><p>For example:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>Configuration</span> <span class=n>myHadoopConfiguration</span> <span class=o>=</span> <span class=k>new</span> <span class=n>Configuration</span><span class=o>(</span><span class=kc>false</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=c1>// Set Hadoop InputFormat, key and value class in configuration
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>myHadoopConfiguration</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;mapreduce.job.inputformat.class&#34;</span><span class=o>,</span> <span class=n>InputFormatClass</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>InputFormat</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>myHadoopConfiguration</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;key.class&#34;</span><span class=o>,</span> <span class=n>InputFormatKeyClass</span><span class=o>,</span> <span class=n>Object</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>myHadoopConfiguration</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;value.class&#34;</span><span class=o>,</span> <span class=n>InputFormatValueClass</span><span class=o>,</span> <span class=n>Object</span><span class=o>.</span><span class=na>class</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><p>You will need to check if the <code>Key</code> and <code>Value</code> classes output by the <code>InputFormat</code> have a Beam <code>Coder</code> available. If not, you can use <code>withKeyTranslation</code> or <code>withValueTranslation</code> to specify a method transforming instances of those classes into another class that is supported by a Beam <code>Coder</code>. These settings are optional and you don&rsquo;t need to specify translation for both key and value.</p><p>For example:</p><p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>SimpleFunction</span><span class=o>&lt;</span><span class=n>InputFormatKeyClass</span><span class=o>,</span> <span class=n>MyKeyClass</span><span class=o>&gt;</span> <span class=n>myOutputKeyType</span> <span class=o>=</span>
</span></span><span class=line><span class=cl><span class=k>new</span> <span class=n>SimpleFunction</span><span class=o>&lt;</span><span class=n>InputFormatKeyClass</span><span class=o>,</span> <span class=n>MyKeyClass</span><span class=o>&gt;()</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=n>MyKeyClass</span> <span class=nf>apply</span><span class=o>(</span><span class=n>InputFormatKeyClass</span> <span class=n>input</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=c1>// ...logic to transform InputFormatKeyClass to MyKeyClass
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>};</span>
</span></span><span class=line><span class=cl><span class=n>SimpleFunction</span><span class=o>&lt;</span><span class=n>InputFormatValueClass</span><span class=o>,</span> <span class=n>MyValueClass</span><span class=o>&gt;</span> <span class=n>myOutputValueType</span> <span class=o>=</span>
</span></span><span class=line><span class=cl><span class=k>new</span> <span class=n>SimpleFunction</span><span class=o>&lt;</span><span class=n>InputFormatValueClass</span><span class=o>,</span> <span class=n>MyValueClass</span><span class=o>&gt;()</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=n>MyValueClass</span> <span class=nf>apply</span><span class=o>(</span><span class=n>InputFormatValueClass</span> <span class=n>input</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=c1>// ...logic to transform InputFormatValueClass to MyValueClass
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>};</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div></p><h4 id=read-data-only-with-hadoop-configuration>Read data only with Hadoop configuration.</h4><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>p</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;read&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>HadoopFormatIO</span><span class=o>.&lt;</span><span class=n>InputFormatKeyClass</span><span class=o>,</span> <span class=n>InputFormatKeyClass</span><span class=o>&gt;</span><span class=n>read</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withConfiguration</span><span class=o>(</span><span class=n>myHadoopConfiguration</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><h4 id=read-data-with-configuration-and-key-translation>Read data with configuration and key translation</h4><p>For example, a Beam <code>Coder</code> is not available for <code>Key</code> class, so key translation is required.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>p</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;read&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>HadoopFormatIO</span><span class=o>.&lt;</span><span class=n>MyKeyClass</span><span class=o>,</span> <span class=n>InputFormatKeyClass</span><span class=o>&gt;</span><span class=n>read</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withConfiguration</span><span class=o>(</span><span class=n>myHadoopConfiguration</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withKeyTranslation</span><span class=o>(</span><span class=n>myOutputKeyType</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><h4 id=read-data-with-configuration-and-value-translation>Read data with configuration and value translation</h4><p>For example, a Beam <code>Coder</code> is not available for <code>Value</code> class, so value translation is required.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>p</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;read&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>HadoopFormatIO</span><span class=o>.&lt;</span><span class=n>InputFormatKeyClass</span><span class=o>,</span> <span class=n>MyValueClass</span><span class=o>&gt;</span><span class=n>read</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withConfiguration</span><span class=o>(</span><span class=n>myHadoopConfiguration</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withValueTranslation</span><span class=o>(</span><span class=n>myOutputValueType</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><h4 id=read-data-with-configuration-value-translation-and-key-translation>Read data with configuration, value translation and key translation</h4><p>For example, Beam Coders are not available for both <code>Key</code> class and <code>Value</code> classes of <code>InputFormat</code>, so key and value translation are required.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>p</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;read&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>HadoopFormatIO</span><span class=o>.&lt;</span><span class=n>MyKeyClass</span><span class=o>,</span> <span class=n>MyValueClass</span><span class=o>&gt;</span><span class=n>read</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withConfiguration</span><span class=o>(</span><span class=n>myHadoopConfiguration</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withKeyTranslation</span><span class=o>(</span><span class=n>myOutputKeyType</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withValueTranslation</span><span class=o>(</span><span class=n>myOutputValueType</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><h1 id=examples-for-specific-inputformats>Examples for specific InputFormats</h1><h3 id=cassandra---cqlinputformat>Cassandra - CqlInputFormat</h3><p>To read data from Cassandra, use <code>org.apache.cassandra.hadoop.cql3.CqlInputFormat</code>, which needs the following properties to be set:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>Configuration</span> <span class=n>cassandraConf</span> <span class=o>=</span> <span class=k>new</span> <span class=n>Configuration</span><span class=o>();</span>
</span></span><span class=line><span class=cl><span class=n>cassandraConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;cassandra.input.thrift.port&#34;</span><span class=o>,</span> <span class=s>&#34;9160&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>cassandraConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;cassandra.input.thrift.address&#34;</span><span class=o>,</span> <span class=n>CassandraHostIp</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>cassandraConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;cassandra.input.partitioner.class&#34;</span><span class=o>,</span> <span class=s>&#34;Murmur3Partitioner&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>cassandraConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;cassandra.input.keyspace&#34;</span><span class=o>,</span> <span class=s>&#34;myKeySpace&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>cassandraConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;cassandra.input.columnfamily&#34;</span><span class=o>,</span> <span class=s>&#34;myColumnFamily&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>cassandraConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;key.class&#34;</span><span class=o>,</span> <span class=n>java</span><span class=o>.</span><span class=na>lang</span><span class=o>.</span><span class=na>Long</span> <span class=n>Long</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>Object</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>cassandraConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;value.class&#34;</span><span class=o>,</span> <span class=n>com</span><span class=o>.</span><span class=na>datastax</span><span class=o>.</span><span class=na>driver</span><span class=o>.</span><span class=na>core</span><span class=o>.</span><span class=na>Row</span> <span class=n>Row</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>Object</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>cassandraConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;mapreduce.job.inputformat.class&#34;</span><span class=o>,</span> <span class=n>org</span><span class=o>.</span><span class=na>apache</span><span class=o>.</span><span class=na>cassandra</span><span class=o>.</span><span class=na>hadoop</span><span class=o>.</span><span class=na>cql3</span><span class=o>.</span><span class=na>CqlInputFormat</span> <span class=n>CqlInputFormat</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>InputFormat</span><span class=o>.</span><span class=na>class</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><p>Call Read transform as follows:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>Long</span><span class=o>,</span> <span class=n>String</span><span class=o>&gt;&gt;</span> <span class=n>cassandraData</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=n>p</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;read&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>HadoopFormatIO</span><span class=o>.&lt;</span><span class=n>Long</span><span class=o>,</span> <span class=n>String</span><span class=o>&gt;</span><span class=n>read</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withConfiguration</span><span class=o>(</span><span class=n>cassandraConf</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withValueTranslation</span><span class=o>(</span><span class=n>cassandraOutputValueType</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><p>The <code>CqlInputFormat</code> key class is <code>java.lang.Long</code> <code>Long</code>, which has a Beam <code>Coder</code>. The <code>CqlInputFormat</code> value class is <code>com.datastax.driver.core.Row</code> <code>Row</code>, which does not have a Beam <code>Coder</code>. Rather than write a new coder, you can provide your own translation method, as follows:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>SimpleFunction</span><span class=o>&lt;</span><span class=n>Row</span><span class=o>,</span> <span class=n>String</span><span class=o>&gt;</span> <span class=n>cassandraOutputValueType</span> <span class=o>=</span> <span class=n>SimpleFunction</span><span class=o>&lt;</span><span class=n>Row</span><span class=o>,</span> <span class=n>String</span><span class=o>&gt;()</span>
</span></span><span class=line><span class=cl><span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=n>String</span> <span class=nf>apply</span><span class=o>(</span><span class=n>Row</span> <span class=n>row</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=k>return</span> <span class=n>row</span><span class=o>.</span><span class=na>getString</span><span class=o>(</span><span class=err>&#39;</span><span class=n>myColName</span><span class=err>&#39;</span><span class=o>);</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>};</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><h3 id=elasticsearch---esinputformat>Elasticsearch - EsInputFormat</h3><p>To read data from Elasticsearch, use <code>EsInputFormat</code>, which needs following properties to be set:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>Configuration</span> <span class=n>elasticsearchConf</span> <span class=o>=</span> <span class=k>new</span> <span class=n>Configuration</span><span class=o>();</span>
</span></span><span class=line><span class=cl><span class=n>elasticsearchConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;es.nodes&#34;</span><span class=o>,</span> <span class=n>ElasticsearchHostIp</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>elasticsearchConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;es.port&#34;</span><span class=o>,</span> <span class=s>&#34;9200&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>elasticsearchConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;es.resource&#34;</span><span class=o>,</span> <span class=s>&#34;ElasticIndexName/ElasticTypeName&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>elasticsearchConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;key.class&#34;</span><span class=o>,</span> <span class=n>org</span><span class=o>.</span><span class=na>apache</span><span class=o>.</span><span class=na>hadoop</span><span class=o>.</span><span class=na>io</span><span class=o>.</span><span class=na>Text</span> <span class=n>Text</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>Object</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>elasticsearchConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;value.class&#34;</span><span class=o>,</span> <span class=n>org</span><span class=o>.</span><span class=na>elasticsearch</span><span class=o>.</span><span class=na>hadoop</span><span class=o>.</span><span class=na>mr</span><span class=o>.</span><span class=na>LinkedMapWritable</span> <span class=n>LinkedMapWritable</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>Object</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>elasticsearchConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;mapreduce.job.inputformat.class&#34;</span><span class=o>,</span> <span class=n>org</span><span class=o>.</span><span class=na>elasticsearch</span><span class=o>.</span><span class=na>hadoop</span><span class=o>.</span><span class=na>mr</span><span class=o>.</span><span class=na>EsInputFormat</span> <span class=n>EsInputFormat</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>InputFormat</span><span class=o>.</span><span class=na>class</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><p>Call Read transform as follows:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>Text</span><span class=o>,</span> <span class=n>LinkedMapWritable</span><span class=o>&gt;&gt;</span> <span class=n>elasticData</span> <span class=o>=</span> <span class=n>p</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;read&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>HadoopFormatIO</span><span class=o>.&lt;</span><span class=n>Text</span><span class=o>,</span> <span class=n>LinkedMapWritable</span><span class=o>&gt;</span><span class=n>read</span><span class=o>().</span><span class=na>withConfiguration</span><span class=o>(</span><span class=n>elasticsearchConf</span><span class=o>));</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><p>The <code>org.elasticsearch.hadoop.mr.EsInputFormat</code>&rsquo;s <code>EsInputFormat</code> key class is <code>org.apache.hadoop.io.Text</code> <code>Text</code>, and its value class is <code>org.elasticsearch.hadoop.mr.LinkedMapWritable</code> <code>LinkedMapWritable</code>. Both key and value classes have Beam Coders.</p><h3 id=hcatalog---hcatinputformat>HCatalog - HCatInputFormat</h3><p>To read data using HCatalog, use <code>org.apache.hive.hcatalog.mapreduce.HCatInputFormat</code>, which needs the following properties to be set:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>Configuration</span> <span class=n>hcatConf</span> <span class=o>=</span> <span class=k>new</span> <span class=n>Configuration</span><span class=o>();</span>
</span></span><span class=line><span class=cl><span class=n>hcatConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;mapreduce.job.inputformat.class&#34;</span><span class=o>,</span> <span class=n>HCatInputFormat</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>InputFormat</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>hcatConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;key.class&#34;</span><span class=o>,</span> <span class=n>LongWritable</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>Object</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>hcatConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;value.class&#34;</span><span class=o>,</span> <span class=n>HCatRecord</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>Object</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>hcatConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;hive.metastore.uris&#34;</span><span class=o>,</span> <span class=s>&#34;thrift://metastore-host:port&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=n>org</span><span class=o>.</span><span class=na>apache</span><span class=o>.</span><span class=na>hive</span><span class=o>.</span><span class=na>hcatalog</span><span class=o>.</span><span class=na>mapreduce</span><span class=o>.</span><span class=na>HCatInputFormat</span><span class=o>.</span><span class=na>setInput</span><span class=o>(</span><span class=n>hcatConf</span><span class=o>,</span> <span class=s>&#34;my_database&#34;</span><span class=o>,</span> <span class=s>&#34;my_table&#34;</span><span class=o>,</span> <span class=s>&#34;my_filter&#34;</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><p>Call Read transform as follows:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>Long</span><span class=o>,</span> <span class=n>HCatRecord</span><span class=o>&gt;&gt;</span> <span class=n>hcatData</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=n>p</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;read&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>HadoopFormatIO</span><span class=o>.&lt;</span><span class=n>Long</span><span class=o>,</span> <span class=n>HCatRecord</span><span class=o>&gt;</span><span class=n>read</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withConfiguration</span><span class=o>(</span><span class=n>hcatConf</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><h3 id=amazon-dynamodb---dynamodbinputformat>Amazon DynamoDB - DynamoDBInputFormat</h3><p>To read data from Amazon DynamoDB, use <code>org.apache.hadoop.dynamodb.read.DynamoDBInputFormat</code>.
DynamoDBInputFormat implements the older <code>org.apache.hadoop.mapred.InputFormat</code> interface and to make it compatible with HadoopFormatIO which uses the newer abstract class <code>org.apache.hadoop.mapreduce.InputFormat</code>,
a wrapper API is required which acts as an adapter between HadoopFormatIO and DynamoDBInputFormat (or in general any InputFormat implementing <code>org.apache.hadoop.mapred.InputFormat</code>)
The below example uses one such available wrapper API - <a href=https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/MapReduceInputFormatWrapper.java>https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/MapReduceInputFormatWrapper.java</a></p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>Configuration</span> <span class=n>dynamoDBConf</span> <span class=o>=</span> <span class=k>new</span> <span class=n>Configuration</span><span class=o>();</span>
</span></span><span class=line><span class=cl><span class=n>Job</span> <span class=n>job</span> <span class=o>=</span> <span class=n>Job</span><span class=o>.</span><span class=na>getInstance</span><span class=o>(</span><span class=n>dynamoDBConf</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>com</span><span class=o>.</span><span class=na>twitter</span><span class=o>.</span><span class=na>elephantbird</span><span class=o>.</span><span class=na>mapreduce</span><span class=o>.</span><span class=na>input</span><span class=o>.</span><span class=na>MapReduceInputFormatWrapper</span><span class=o>.</span><span class=na>setInputFormat</span><span class=o>(</span><span class=n>org</span><span class=o>.</span><span class=na>apache</span><span class=o>.</span><span class=na>hadoop</span><span class=o>.</span><span class=na>dynamodb</span><span class=o>.</span><span class=na>read</span><span class=o>.</span><span class=na>DynamoDBInputFormat</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>job</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>dynamoDBConf</span> <span class=o>=</span> <span class=n>job</span><span class=o>.</span><span class=na>getConfiguration</span><span class=o>();</span>
</span></span><span class=line><span class=cl><span class=n>dynamoDBConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;key.class&#34;</span><span class=o>,</span> <span class=n>Text</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>WritableComparable</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>dynamoDBConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;value.class&#34;</span><span class=o>,</span> <span class=n>org</span><span class=o>.</span><span class=na>apache</span><span class=o>.</span><span class=na>hadoop</span><span class=o>.</span><span class=na>dynamodb</span><span class=o>.</span><span class=na>DynamoDBItemWritable</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>Writable</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>dynamoDBConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;dynamodb.servicename&#34;</span><span class=o>,</span> <span class=s>&#34;dynamodb&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>dynamoDBConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;dynamodb.input.tableName&#34;</span><span class=o>,</span> <span class=s>&#34;table_name&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>dynamoDBConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;dynamodb.endpoint&#34;</span><span class=o>,</span> <span class=s>&#34;dynamodb.us-west-1.amazonaws.com&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>dynamoDBConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;dynamodb.regionid&#34;</span><span class=o>,</span> <span class=s>&#34;us-west-1&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>dynamoDBConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;dynamodb.throughput.read&#34;</span><span class=o>,</span> <span class=s>&#34;1&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>dynamoDBConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;dynamodb.throughput.read.percent&#34;</span><span class=o>,</span> <span class=s>&#34;1&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>dynamoDBConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;dynamodb.version&#34;</span><span class=o>,</span> <span class=s>&#34;2011-12-05&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>dynamoDBConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=n>DynamoDBConstants</span><span class=o>.</span><span class=na>DYNAMODB_ACCESS_KEY_CONF</span><span class=o>,</span> <span class=s>&#34;aws_access_key&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>dynamoDBConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=n>DynamoDBConstants</span><span class=o>.</span><span class=na>DYNAMODB_SECRET_KEY_CONF</span><span class=o>,</span> <span class=s>&#34;aws_secret_key&#34;</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><p>Call Read transform as follows:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>Text</span><span class=o>,</span> <span class=n>DynamoDBItemWritable</span><span class=o>&gt;</span> <span class=n>dynamoDBData</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=n>p</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;read&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>HadoopFormatIO</span><span class=o>.&lt;</span><span class=n>Text</span><span class=o>,</span> <span class=n>DynamoDBItemWritable</span><span class=o>&gt;</span><span class=n>read</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withConfiguration</span><span class=o>(</span><span class=n>dynamoDBConf</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><h3 id=apache-hbase---tablesnapshotinputformat>Apache HBase - TableSnapshotInputFormat</h3><p>To read data from an HBase table snapshot, use <code>org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat</code>.
Reading from a table snapshot bypasses the HBase region servers, instead reading HBase data files directly from the filesystem.
This is useful for cases such as reading historical data or offloading of work from the HBase cluster.
There are scenarios when this may prove faster than accessing content through the region servers using the <code>HBaseIO</code>.</p><p>A table snapshot can be taken using the HBase shell or programmatically:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=k>try</span> <span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>Connection</span> <span class=n>connection</span> <span class=o>=</span> <span class=n>ConnectionFactory</span><span class=o>.</span><span class=na>createConnection</span><span class=o>(</span><span class=n>hbaseConf</span><span class=o>);</span>
</span></span><span class=line><span class=cl> <span class=n>Admin</span> <span class=n>admin</span> <span class=o>=</span> <span class=n>connection</span><span class=o>.</span><span class=na>getAdmin</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=n>admin</span><span class=o>.</span><span class=na>snapshot</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;my_snaphshot&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>TableName</span><span class=o>.</span><span class=na>valueOf</span><span class=o>(</span><span class=s>&#34;my_table&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=n>HBaseProtos</span><span class=o>.</span><span class=na>SnapshotDescription</span><span class=o>.</span><span class=na>Type</span><span class=o>.</span><span class=na>FLUSH</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><p>A <code>TableSnapshotInputFormat</code> is configured as follows:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=c1>// Construct a typical HBase scan
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>Scan</span> <span class=n>scan</span> <span class=o>=</span> <span class=k>new</span> <span class=n>Scan</span><span class=o>();</span>
</span></span><span class=line><span class=cl><span class=n>scan</span><span class=o>.</span><span class=na>setCaching</span><span class=o>(</span><span class=n>1000</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>scan</span><span class=o>.</span><span class=na>setBatch</span><span class=o>(</span><span class=n>1000</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>scan</span><span class=o>.</span><span class=na>addColumn</span><span class=o>(</span><span class=n>Bytes</span><span class=o>.</span><span class=na>toBytes</span><span class=o>(</span><span class=s>&#34;CF&#34;</span><span class=o>),</span> <span class=n>Bytes</span><span class=o>.</span><span class=na>toBytes</span><span class=o>(</span><span class=s>&#34;col_1&#34;</span><span class=o>));</span>
</span></span><span class=line><span class=cl><span class=n>scan</span><span class=o>.</span><span class=na>addColumn</span><span class=o>(</span><span class=n>Bytes</span><span class=o>.</span><span class=na>toBytes</span><span class=o>(</span><span class=s>&#34;CF&#34;</span><span class=o>),</span> <span class=n>Bytes</span><span class=o>.</span><span class=na>toBytes</span><span class=o>(</span><span class=s>&#34;col_2&#34;</span><span class=o>));</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=n>Configuration</span> <span class=n>hbaseConf</span> <span class=o>=</span> <span class=n>HBaseConfiguration</span><span class=o>.</span><span class=na>create</span><span class=o>();</span>
</span></span><span class=line><span class=cl><span class=n>hbaseConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=n>HConstants</span><span class=o>.</span><span class=na>ZOOKEEPER_QUORUM</span><span class=o>,</span> <span class=s>&#34;zk1:2181&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>hbaseConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;hbase.rootdir&#34;</span><span class=o>,</span> <span class=s>&#34;/hbase&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>hbaseConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;mapreduce.job.inputformat.class&#34;</span><span class=o>,</span> <span class=n>TableSnapshotInputFormat</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>InputFormat</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>hbaseConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;key.class&#34;</span><span class=o>,</span> <span class=n>ImmutableBytesWritable</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>Writable</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>hbaseConf</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;value.class&#34;</span><span class=o>,</span> <span class=n>Result</span><span class=o>.</span><span class=na>class</span><span class=o>,</span> <span class=n>Writable</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>ClientProtos</span><span class=o>.</span><span class=na>Scan</span> <span class=n>proto</span> <span class=o>=</span> <span class=n>ProtobufUtil</span><span class=o>.</span><span class=na>toScan</span><span class=o>(</span><span class=n>scan</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>hbaseConf</span><span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=n>TableInputFormat</span><span class=o>.</span><span class=na>SCAN</span><span class=o>,</span> <span class=n>Base64</span><span class=o>.</span><span class=na>encodeBytes</span><span class=o>(</span><span class=n>proto</span><span class=o>.</span><span class=na>toByteArray</span><span class=o>()));</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=c1>// Make use of existing utility methods
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>Job</span> <span class=n>job</span> <span class=o>=</span> <span class=n>Job</span><span class=o>.</span><span class=na>getInstance</span><span class=o>(</span><span class=n>hbaseConf</span><span class=o>);</span> <span class=c1>// creates internal clone of hbaseConf
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>TableSnapshotInputFormat</span><span class=o>.</span><span class=na>setInput</span><span class=o>(</span><span class=n>job</span><span class=o>,</span> <span class=s>&#34;my_snapshot&#34;</span><span class=o>,</span> <span class=k>new</span> <span class=n>Path</span><span class=o>(</span><span class=s>&#34;/tmp/snapshot_restore&#34;</span><span class=o>));</span>
</span></span><span class=line><span class=cl><span class=n>hbaseConf</span> <span class=o>=</span> <span class=n>job</span><span class=o>.</span><span class=na>getConfiguration</span><span class=o>();</span> <span class=c1>// extract the modified clone
</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><p>Call Read transform as follows:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>ImmutableBytesWritable</span><span class=o>,</span> <span class=n>Result</span><span class=o>&gt;</span> <span class=n>hbaseSnapshotData</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=n>p</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;read&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>HadoopFormatIO</span><span class=o>.&lt;</span><span class=n>ImmutableBytesWritable</span><span class=o>,</span> <span class=n>Result</span><span class=o>&gt;</span><span class=n>read</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withConfiguration</span><span class=o>(</span><span class=n>hbaseConf</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><h3 id=writing-using-hadoopformatio>Writing using HadoopFormatIO</h3><p>You will need to pass a Hadoop <code>Configuration</code> with parameters specifying how the write will occur. Many properties of the <code>Configuration</code> are optional, and some are required for certain <code>OutputFormat</code> classes, but the following properties must be set for all <code>OutputFormat</code>s:</p><ul><li><code>mapreduce.job.id</code> - The identifier of the write job. E.g.: end timestamp of window.</li><li><code>mapreduce.job.outputformat.class</code> - The <code>OutputFormat</code> class used to connect to your data sink of choice.</li><li><code>mapreduce.job.output.key.class</code> - The key class passed to the <code>OutputFormat</code> in <code>mapreduce.job.outputformat.class</code>.</li><li><code>mapreduce.job.output.value.class</code> - The value class passed to the <code>OutputFormat</code> in <code>mapreduce.job.outputformat.class</code>.</li><li><code>mapreduce.job.reduces</code> - Number of reduce tasks. Value is equal to number of write tasks which will be generated. This property is not required for <code>Write.PartitionedWriterBuilder#withoutPartitioning()</code> write.</li><li><code>mapreduce.job.partitioner.class</code> - Hadoop partitioner class which will be used for distributing of records among partitions. This property is not required for <code>Write.PartitionedWriterBuilder#withoutPartitioning()</code> write.</li></ul><p><em>Note</em>: All mentioned values have appropriate constants. E.g.: <code>HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR</code>.</p><p>For example:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>Configuration</span> <span class=n>myHadoopConfiguration</span> <span class=o>=</span> <span class=k>new</span> <span class=n>Configuration</span><span class=o>(</span><span class=kc>false</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=c1>// Set Hadoop OutputFormat, key and value class in configuration
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>myHadoopConfiguration</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;mapreduce.job.outputformat.class&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>MyDbOutputFormatClass</span><span class=o>,</span> <span class=n>OutputFormat</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>myHadoopConfiguration</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;mapreduce.job.output.key.class&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>MyDbOutputFormatKeyClass</span><span class=o>,</span> <span class=n>Object</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>myHadoopConfiguration</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;mapreduce.job.output.value.class&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>MyDbOutputFormatValueClass</span><span class=o>,</span> <span class=n>Object</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>myHadoopConfiguration</span><span class=o>.</span><span class=na>setClass</span><span class=o>(</span><span class=s>&#34;mapreduce.job.partitioner.class&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>MyPartitionerClass</span><span class=o>,</span> <span class=n>Object</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>myHadoopConfiguration</span><span class=o>.</span><span class=na>setInt</span><span class=o>(</span><span class=s>&#34;mapreduce.job.reduces&#34;</span><span class=o>,</span> <span class=n>2</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><p>You will need to set <code>OutputFormat</code> key and value class (i.e. &ldquo;mapreduce.job.output.key.class&rdquo; and &ldquo;mapreduce.job.output.value.class&rdquo;) in Hadoop <code>Configuration</code> which are equal to <code>KeyT</code> and <code>ValueT</code>. If you set different <code>OutputFormat</code> key or value class than <code>OutputFormat</code>&rsquo;s actual key or value class then, it will throw <code>IllegalArgumentException</code>.</p><h4 id=batch-writing>Batch writing</h4><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=c1>// Data which will we want to write
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>Text</span><span class=o>,</span> <span class=n>LongWritable</span><span class=o>&gt;&gt;</span> <span class=n>boundedWordsCount</span> <span class=o>=</span> <span class=o>...</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=c1>// Hadoop configuration for write
</span></span></span><span class=line><span class=cl><span class=c1>// We have partitioned write, so Partitioner and reducers count have to be set - see withPartitioning() javadoc
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>Configuration</span> <span class=n>myHadoopConfiguration</span> <span class=o>=</span> <span class=o>...</span>
</span></span><span class=line><span class=cl><span class=c1>// Path to directory with locks
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>String</span> <span class=n>locksDirPath</span> <span class=o>=</span> <span class=o>...;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=n>boundedWordsCount</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;writeBatch&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>HadoopFormatIO</span><span class=o>.&lt;</span><span class=n>Text</span><span class=o>,</span> <span class=n>LongWritable</span><span class=o>&gt;</span><span class=n>write</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withConfiguration</span><span class=o>(</span><span class=n>myHadoopConfiguration</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withPartitioning</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withExternalSynchronization</span><span class=o>(</span><span class=k>new</span> <span class=n>HDFSSynchronization</span><span class=o>(</span><span class=n>locksDirPath</span><span class=o>)));</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><h4 id=stream-writing>Stream writing</h4><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=c1>// Data which will we want to write
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>Text</span><span class=o>,</span> <span class=n>LongWritable</span><span class=o>&gt;&gt;</span> <span class=n>unboundedWordsCount</span> <span class=o>=</span> <span class=o>...;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=c1>// Transformation which transforms data of one window into one hadoop configuration
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>PTransform</span><span class=o>&lt;</span><span class=n>PCollection</span><span class=o>&lt;?</span> <span class=kd>extends</span> <span class=n>KV</span><span class=o>&lt;</span><span class=n>Text</span><span class=o>,</span> <span class=n>LongWritable</span><span class=o>&gt;&gt;,</span> <span class=n>PCollectionView</span><span class=o>&lt;</span><span class=n>Configuration</span><span class=o>&gt;&gt;</span>
</span></span><span class=line><span class=cl> <span class=n>configTransform</span> <span class=o>=</span> <span class=o>...;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=n>unboundedWordsCount</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;writeStream&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>HadoopFormatIO</span><span class=o>.&lt;</span><span class=n>Text</span><span class=o>,</span> <span class=n>LongWritable</span><span class=o>&gt;</span><span class=n>write</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withConfigurationTransform</span><span class=o>(</span><span class=n>configTransform</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withExternalSynchronization</span><span class=o>(</span><span class=k>new</span> <span class=n>HDFSSynchronization</span><span class=o>(</span><span class=n>locksDirPath</span><span class=o>)));</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># The Beam SDK for Python does not support Hadoop Input/Output Format IO.</span></span></span></code></pre></div></div></div><div class=feedback><p class=update>Last updated on 2024/05/11</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>