blob: ecf3bb17f90724f668f3a96ef99b32859abaacd8 [file] [log] [blame]
<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>Google BigQuery I/O connector</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/documentation/io/built-in/google-bigquery/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/io/built-in/google-bigquery.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/io/built-in/google-bigquery.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Documentation</span></li><li><a href=/documentation>Using the Documentation</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Concepts</span><ul class=section-nav-list><li><a href=/documentation/basics/>Basics of the Beam model</a></li><li><a href=/documentation/runtime/model/>How Beam executes a pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Beam programming guide</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/>Overview</a></li><li><a href=/documentation/programming-guide/#creating-a-pipeline>Pipelines</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>PCollections</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pcollections>Creating a PCollection</a></li><li><a href=/documentation/programming-guide/#pcollection-characteristics>PCollection characteristics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#applying-transforms>Applying transforms</a></li><li><span class=section-nav-list-title>Core Beam transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pardo>ParDo</a></li><li><a href=/documentation/programming-guide/#groupbykey>GroupByKey</a></li><li><a href=/documentation/programming-guide/#cogroupbykey>CoGroupByKey</a></li><li><a href=/documentation/programming-guide/#combine>Combine</a></li><li><a href=/documentation/programming-guide/#flatten>Flatten</a></li><li><a href=/documentation/programming-guide/#partition>Partition</a></li></ul></li><li><a href=/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms>Requirements for user code</a></li><li><a href=/documentation/programming-guide/#side-inputs>Side inputs</a></li><li><a href=/documentation/programming-guide/#additional-outputs>Additional outputs</a></li><li><a href=/documentation/programming-guide/#composite-transforms>Composite transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline I/O</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pipeline-io>Using I/O transforms</a></li><li><a href=/documentation/io/connectors/>I/O connectors</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>I/O connector guides</span><ul class=section-nav-list><li><a href=/documentation/io/built-in/parquet/>Apache Parquet I/O connector</a></li><li><a href=/documentation/io/built-in/hadoop/>Hadoop Input/Output Format IO</a></li><li><a href=/documentation/io/built-in/hcatalog/>HCatalog IO</a></li><li><a href=/documentation/io/built-in/google-bigquery/>Google BigQuery I/O connector</a></li><li><a href=/documentation/io/built-in/snowflake/>Snowflake I/O connector</a></li><li><a href=/documentation/io/built-in/cdap/>CDAP I/O connector</a></li><li><a href=/documentation/io/built-in/sparkreceiver/>Spark Receiver I/O connector</a></li><li><a href=/documentation/io/built-in/singlestore/>SingleStoreDB I/O connector</a></li><li><a href=/documentation/io/built-in/webapis/>Web APIs I/O connector</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Developing new I/O connectors</span><ul class=section-nav-list><li><a href=/documentation/io/developing-io-overview/>Overview: Developing connectors</a></li><li><a href=/documentation/io/developing-io-java/>Developing connectors (Java)</a></li><li><a href=/documentation/io/developing-io-python/>Developing connectors (Python)</a></li><li><a href=/documentation/io/io-standards/>I/O Standards</a></li></ul></li><li><a href=/documentation/io/testing/>Testing I/O transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Schemas</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#what-is-a-schema>What is a schema</a></li><li><a href=/documentation/programming-guide/#schemas-for-pl-types>Schemas for programming language types</a></li><li><a href=/documentation/programming-guide/#schema-definition>Schema definition</a></li><li><a href=/documentation/programming-guide/#logical-types>Logical types</a></li><li><a href=/documentation/programming-guide/#creating-schemas>Creating schemas</a></li><li><a href=/documentation/programming-guide/#using-schemas>Using schemas</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data encoding and type safety</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#data-encoding-and-type-safety>Data encoding basics</a></li><li><a href=/documentation/programming-guide/#specifying-coders>Specifying coders</a></li><li><a href=/documentation/programming-guide/#default-coders-and-the-coderregistry>Default coders and the CoderRegistry</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Windowing</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#windowing>Windowing basics</a></li><li><a href=/documentation/programming-guide/#provided-windowing-functions>Provided windowing functions</a></li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function>Setting your PCollection’s windowing function</a></li><li><a href=/documentation/programming-guide/#watermarks-and-late-data>Watermarks and late data</a></li><li><a href=/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements>Adding timestamps to a PCollection’s elements</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Triggers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#triggers>Trigger basics</a></li><li><a href=/documentation/programming-guide/#event-time-triggers>Event time triggers and the default trigger</a></li><li><a href=/documentation/programming-guide/#processing-time-triggers>Processing time triggers</a></li><li><a href=/documentation/programming-guide/#data-driven-triggers>Data-driven triggers</a></li><li><a href=/documentation/programming-guide/#setting-a-trigger>Setting a trigger</a></li><li><a href=/documentation/programming-guide/#composite-triggers>Composite triggers</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Metrics</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#metrics>Metrics basics</a></li><li><a href=/documentation/programming-guide/#types-of-metrics>Types of metrics</a></li><li><a href=/documentation/programming-guide/#querying-metrics>Querying metrics</a></li><li><a href=/documentation/programming-guide/#using-metrics>Using metrics in pipeline</a></li><li><a href=/documentation/programming-guide/#export-metrics>Export metrics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>State and Timers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#types-of-state>Types of state</a></li><li><a href=/documentation/programming-guide/#deferred-state-reads>Deferred state reads</a></li><li><a href=/documentation/programming-guide/#timers>Timers</a></li><li><a href=/documentation/programming-guide/#garbage-collecting-state>Garbage collecting state</a></li><li><a href=/documentation/programming-guide/#state-timers-examples>State and timers examples</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Splittable DoFns</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#sdf-basics>Basics</a></li><li><a href=/documentation/programming-guide/#sizing-and-progress>Sizing and progress</a></li><li><a href=/documentation/programming-guide/#user-initiated-checkpoint>User-initiated checkpoint</a></li><li><a href=/documentation/programming-guide/#runner-initiated-split>Runner initiated split</a></li><li><a href=/documentation/programming-guide/#watermark-estimation>Watermark estimation</a></li><li><a href=/documentation/programming-guide/#truncating-during-drain>Truncating during drain</a></li><li><a href=/documentation/programming-guide/#bundle-finalization>Bundle finalization</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Multi-language Pipelines</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#create-x-lang-transforms>Creating cross-language transforms</a></li><li><a href=/documentation/programming-guide/#use-x-lang-transforms>Using cross-language transforms</a></li><li><a href=/documentation/programming-guide/#x-lang-transform-runner-support>Runner Support</a></li></ul></li><li><a href=/documentation/programming-guide/#batched-dofns>Batched DoFns</a></li><li><a href=/documentation/programming-guide/#transform-service>Transform service</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline development lifecycle</span><ul class=section-nav-list><li><a href=/documentation/pipelines/design-your-pipeline/>Design Your Pipeline</a></li><li><a href=/documentation/pipelines/create-your-pipeline/>Create Your Pipeline</a></li><li><a href=/documentation/pipelines/test-your-pipeline/>Test Your Pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Common pipeline patterns</span><ul class=section-nav-list><li><a href=/documentation/patterns/overview/>Overview</a></li><li><a href=/documentation/patterns/file-processing/>File processing</a></li><li><a href=/documentation/patterns/side-inputs/>Side inputs</a></li><li><a href=/documentation/patterns/pipeline-options/>Pipeline options</a></li><li><a href=/documentation/patterns/custom-io/>Custom I/O</a></li><li><a href=/documentation/patterns/custom-windows/>Custom windows</a></li><li><a href=/documentation/patterns/bigqueryio/>BigQueryIO</a></li><li><a href=/documentation/patterns/ai-platform/>AI Platform</a></li><li><a href=/documentation/patterns/schema/>Schema</a></li><li><a href=/documentation/patterns/bqml/>BigQuery ML</a></li><li><a href=/documentation/patterns/grouping-elements-for-efficient-external-service-calls/>Grouping elements for efficient external service calls</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>AI/ML pipelines</span><ul class=section-nav-list><li><a href=/documentation/ml/overview/>Get started with AI/ML</a></li><li><a href=/documentation/ml/about-ml/>About Beam ML</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Prediction and inference</span><ul class=section-nav-list><li><a href=/documentation/ml/inference-overview/>Overview</a></li><li><a href=/documentation/ml/multi-model-pipelines/>Build a pipeline with multiple models</a></li><li><a href=/documentation/ml/tensorrt-runinference>Build a custom model handler with TensorRT</a></li><li><a href=/documentation/ml/large-language-modeling>Use LLM inference</a></li><li><a href=/documentation/ml/multi-language-inference/>Build a multi-language inference pipeline</a></li><li><a href=/documentation/ml/side-input-updates/>Update your model in production</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data processing</span><ul class=section-nav-list><li><a href=/documentation/ml/preprocess-data/>Preprocess data</a></li><li><a href=/documentation/ml/data-processing/>Explore your data</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Workflow orchestration</span><ul class=section-nav-list><li><a href=/documentation/ml/orchestration/>Use ML-OPS workflow orchestrators</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Model training</span><ul class=section-nav-list><li><a href=/documentation/ml/per-entity-training>Per-entity training</a></li><li><a href=/documentation/ml/online-clustering/>Online clustering</a></li><li><a href=/documentation/ml/model-evaluation/>ML model evaluation</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Use cases</span><ul class=section-nav-list><li><a href=/documentation/ml/anomaly-detection/>Build an anomaly detection pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Reference</span><ul class=section-nav-list><li><a href=/documentation/ml/runinference-metrics/>RunInference metrics</a></li><li><a href=/documentation/ml/model-evaluation/>Model validation</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Runtime systems</span><ul class=section-nav-list><li><a href=/documentation/runtime/environments/>Container environments</a></li><li><a href=/documentation/runtime/resource-hints/>Resource hints</a></li><li><a href=/documentation/runtime/sdk-harness-config/>SDK Harness Configuration</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transform catalog</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Python</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Enrichment</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/enrichment/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-bigtable/>Bigtable example</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-vertexai/>Vertex AI Feature Store examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/python/elementwise/flatmap/>FlatMap</a></li><li><a href=/documentation/transforms/python/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/python/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/python/elementwise/map/>Map</a></li><li><a href=/documentation/transforms/python/elementwise/mltransform/>MLTransform</a></li><li><a href=/documentation/transforms/python/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/python/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/python/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/python/elementwise/reify/>Reify</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>RunInference</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/runinference/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-pytorch/>PyTorch examples</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-sklearn/>Sklearn examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/python/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/python/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/combineglobally/>CombineGlobally</a></li><li><a href=/documentation/transforms/python/aggregation/combineperkey/>CombinePerKey</a></li><li><a href=/documentation/transforms/python/aggregation/combinevalues/>CombineValues</a></li><li><a href=/documentation/transforms/python/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/python/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/python/aggregation/groupby/>GroupBy</a></li><li><a href=/documentation/transforms/python/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/python/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/python/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/python/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/python/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/python/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/python/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/python/aggregation/top/>Top</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/other/create/>Create</a></li><li><a href=/documentation/transforms/python/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/python/other/reshuffle/>Reshuffle</a></li><li><a href=/documentation/transforms/python/other/windowinto/>WindowInto</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Java</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/java/elementwise/flatmapelements/>FlatMapElements</a></li><li><a href=/documentation/transforms/java/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/java/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/java/elementwise/mapelements/>MapElements</a></li><li><a href=/documentation/transforms/java/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/java/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/java/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/java/elementwise/reify/>Reify</a></li><li><a href=/documentation/transforms/java/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/java/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/java/elementwise/withkeys/>WithKeys</a></li><li><a href=/documentation/transforms/java/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/java/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/java/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/combine/>Combine</a></li><li><a href=/documentation/transforms/java/aggregation/combinewithcontext/>CombineWithContext</a></li><li><a href=/documentation/transforms/java/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/java/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/java/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/java/aggregation/hllcount/>HllCount</a></li><li><a href=/documentation/transforms/java/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/java/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/java/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/java/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/java/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/java/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/java/aggregation/top/>Top</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/other/create/>Create</a></li><li><a href=/documentation/transforms/java/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/java/other/passert/>PAssert</a></li><li><a href=/documentation/transforms/java/other/view/>View</a></li><li><a href=/documentation/transforms/java/other/window/>Window</a></li></ul></li></ul></li></ul></li><li><a href=/documentation/glossary/>Glossary</a></li><li><a href=https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam>Beam Wiki <img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#before-you-start>Before you start</a></li><li><a href=#bigquery-basics>BigQuery basics</a><ul><li><a href=#table-names>Table names</a><ul><li><a href=#using-a-string>Using a string</a></li><li><a href=#using-a-tablereference>Using a TableReference</a></li></ul></li><li><a href=#table-rows>Table rows</a></li><li><a href=#schemas>Schemas</a></li><li><a href=#data-types>Data types</a></li></ul></li><li><a href=#reading-from-bigquery>Reading from BigQuery</a><ul><li><a href=#reading-from-a-table>Reading from a table</a></li><li><a href=#reading-with-a-query-string>Reading with a query string</a></li><li><a href=#storage-api>Using the Storage Read API</a><ul><li><a href=#updating-your-code>Updating your code</a></li></ul></li></ul></li><li><a href=#writing-to-bigquery>Writing to BigQuery</a><ul><li><a href=#create-disposition>Create disposition</a></li><li><a href=#write-disposition>Write disposition</a></li><li><a href=#creating-a-table-schema>Creating a table schema</a><ul><li><a href=#using-a-tableschema>Using a TableSchema</a></li><li><a href=#using-a-string-1>Using a string</a></li></ul></li><li><a href=#setting-the-insertion-method>Setting the insertion method</a></li><li><a href=#writing-to-a-table>Writing to a table</a></li><li><a href=#storage-write-api>Using the Storage Write API</a><ul><li><a href=#exactly-once-semantics>Exactly-once semantics</a></li><li><a href=#at-least-once-semantics>At-least-once semantics</a></li><li><a href=#quotas>Quotas</a></li></ul></li><li><a href=#using-dynamic-destinations>Using dynamic destinations</a></li><li><a href=#using-time-partitioning>Using time partitioning</a></li></ul></li><li><a href=#limitations>Limitations</a></li><li><a href=#additional-examples>Additional examples</a><ul><li><a href=#java-cookbook-examples>Java cookbook examples</a></li><li><a href=#java-complete-examples>Java complete examples</a></li><li><a href=#python-cookbook-examples>Python cookbook examples</a></li></ul></li></ul></nav></nav><div class="body__contained body__section-nav arrow-list arrow-list--no-mt"><p><a href=/documentation/io/built-in/>Built-in I/O Transforms</a></p><h1 id=google-bigquery-io-connector>Google BigQuery I/O connector</h1><nav class=language-switcher><strong>Adapt for:</strong><ul><li data-value=java class=active>Java SDK</li><li data-value=py>Python SDK</li></ul></nav><p>The Beam SDKs include built-in transforms that can read data from and write data
to <a href=https://cloud.google.com/bigquery>Google BigQuery</a> tables.</p><h2 id=before-you-start>Before you start</h2><p class=language-java>To use BigQueryIO, add the Maven artifact dependency to your <code>pom.xml</code> file.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=o>&lt;</span><span class=n>dependency</span><span class=o>&gt;</span>
</span></span><span class=line><span class=cl> <span class=o>&lt;</span><span class=n>groupId</span><span class=o>&gt;</span><span class=n>org</span><span class=o>.</span><span class=na>apache</span><span class=o>.</span><span class=na>beam</span><span class=o>&lt;/</span><span class=n>groupId</span><span class=o>&gt;</span>
</span></span><span class=line><span class=cl> <span class=o>&lt;</span><span class=n>artifactId</span><span class=o>&gt;</span><span class=n>beam</span><span class=o>-</span><span class=n>sdks</span><span class=o>-</span><span class=n>java</span><span class=o>-</span><span class=n>io</span><span class=o>-</span><span class=n>google</span><span class=o>-</span><span class=n>cloud</span><span class=o>-</span><span class=n>platform</span><span class=o>&lt;/</span><span class=n>artifactId</span><span class=o>&gt;</span>
</span></span><span class=line><span class=cl> <span class=o>&lt;</span><span class=n>version</span><span class=o>&gt;</span><span class=n>2</span><span class=o>.</span><span class=na>55</span><span class=o>.</span><span class=na>0</span><span class=o>&lt;/</span><span class=n>version</span><span class=o>&gt;</span>
</span></span><span class=line><span class=cl><span class=o>&lt;/</span><span class=n>dependency</span><span class=o>&gt;</span></span></span></code></pre></div></div></div><p class=language-java>Additional resources:</p><span class=language-java><ul><li><a href=https://github.com/apache/beam/tree/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery>BigQueryIO source code</a></li><li><a href=https://beam.apache.org/releases/javadoc/2.55.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html>BigQueryIO Javadoc</a></li><li><a href=https://cloud.google.com/bigquery/docs>Google BigQuery documentation</a></li></ul></span><p class=language-py>To use BigQueryIO, you must install the Google Cloud Platform dependencies by
running <code>pip install apache-beam[gcp]</code>.</p><p class=language-py>Additional resources:</p><span class=language-py><ul><li><a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py>BigQueryIO source code</a></li><li><a href=https://beam.apache.org/releases/pydoc/2.55.0/apache_beam.io.gcp.bigquery.html>BigQueryIO Pydoc</a></li><li><a href=https://cloud.google.com/bigquery/docs>Google BigQuery documentation</a></li></ul></span><h2 id=bigquery-basics>BigQuery basics</h2><h3 id=table-names>Table names</h3><p>To read or write from a BigQuery table, you must provide a fully-qualified
BigQuery table name (for example, <code>bigquery-public-data:github_repos.sample_contents</code>).
A fully-qualified BigQuery table name consists of three parts:</p><ul><li><strong>Project ID</strong>: The ID for your Google Cloud Project. The default value comes
from your pipeline options object.</li><li><strong>Dataset ID</strong>: The BigQuery dataset ID, which is unique within a given Cloud
Project.</li><li><strong>Table ID</strong>: A BigQuery table ID, which is unique within a given dataset.</li></ul><p>A table name can also include a <a href=https://cloud.google.com/bigquery/table-decorators>table decorator</a>
if you are using <a href=#using-time-partitioning>time-partitioned tables</a>.</p><p>To specify a BigQuery table, you can use either the table&rsquo;s fully-qualified name as
a string, or use a
<span class=language-java><a href=https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/index.html?com/google/api/services/bigquery/model/TableReference.html>TableReference</a></span>
<span class=language-py><a href=https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.bigquery.html#table-references>TableReference</a></span>
object.</p><h4 id=using-a-string>Using a string</h4><p>To specify a table with a string, use the format
<code>[project_id]:[dataset_id].[table_id]</code> to specify the fully-qualified BigQuery
table name.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>String</span> <span class=n>tableSpec</span> <span class=o>=</span> <span class=s>&#34;apache-beam-testing.samples.weather_stations&#34;</span><span class=o>;</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=c1># project-id:dataset_id.table_id</span>
</span></span><span class=line><span class=cl><span class=n>table_spec</span> <span class=o>=</span> <span class=s1>&#39;apache-beam-testing.samples.weather_stations&#39;</span></span></span></code></pre></div></div></div><p>You can also omit <code>project_id</code> and use the <code>[dataset_id].[table_id]</code> format. If
you omit the project ID, Beam uses the default project ID from your
<span class=language-java><a href=https://beam.apache.org/releases/javadoc/2.55.0/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.html>pipeline options</a>.</span>
<span class=language-py><a href=https://beam.apache.org/releases/pydoc/2.55.0/apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.GoogleCloudOptions>pipeline options</a>.</span></p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>String</span> <span class=n>tableSpec</span> <span class=o>=</span> <span class=s>&#34;samples.weather_stations&#34;</span><span class=o>;</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=c1># dataset_id.table_id</span>
</span></span><span class=line><span class=cl><span class=n>table_spec</span> <span class=o>=</span> <span class=s1>&#39;samples.weather_stations&#39;</span></span></span></code></pre></div></div></div><h4 id=using-a-tablereference>Using a TableReference</h4><p>To specify a table with a <code>TableReference</code>, create a new <code>TableReference</code> using
the three parts of the BigQuery table name.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>TableReference</span> <span class=n>tableSpec</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableReference</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setProjectId</span><span class=o>(</span><span class=s>&#34;clouddataflow-readonly&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setDatasetId</span><span class=o>(</span><span class=s>&#34;samples&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setTableId</span><span class=o>(</span><span class=s>&#34;weather_stations&#34;</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=kn>from</span> <span class=nn>apache_beam.io.gcp.internal.clients</span> <span class=kn>import</span> <span class=n>bigquery</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=n>table_spec</span> <span class=o>=</span> <span class=n>bigquery</span><span class=o>.</span><span class=n>TableReference</span><span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>projectId</span><span class=o>=</span><span class=s1>&#39;clouddataflow-readonly&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>datasetId</span><span class=o>=</span><span class=s1>&#39;samples&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>tableId</span><span class=o>=</span><span class=s1>&#39;weather_stations&#39;</span><span class=p>)</span></span></span></code></pre></div></div></div><p class=language-java>The Beam SDK for Java also provides the <a href=https://beam.apache.org/releases/javadoc/2.55.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.html><code>parseTableSpec</code></a>
helper method, which constructs a <code>TableReference</code> object from a String that
contains the fully-qualified BigQuery table name. However, the static factory
methods for BigQueryIO transforms accept the table name as a String and
construct a <code>TableReference</code> object for you.</p><h3 id=table-rows>Table rows</h3><p>BigQueryIO read and write transforms produce and consume data as a <code>PCollection</code>
of dictionaries, where each element in the <code>PCollection</code> represents a single row
in the table.</p><h3 id=schemas>Schemas</h3><p>When writing to BigQuery, you must supply a table schema for the destination
table that you want to write to, unless you specify a <a href=#create-disposition>create
disposition</a> of <code>CREATE_NEVER</code>. <a href=#creating-a-table-schema>Creating a table
schema</a> covers schemas in more detail.</p><h3 id=data-types>Data types</h3><p>BigQuery supports the following data types: STRING, BYTES, INTEGER, FLOAT,
NUMERIC, BOOLEAN, TIMESTAMP, DATE, TIME, DATETIME and GEOGRAPHY. For an
overview of Google Standard SQL data types, see
<a href=https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types>Data types</a>.
BigQueryIO allows you to use all of these data types. The following example
shows the correct format for data types used when reading from and writing to
BigQuery:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kn>import</span> <span class=nn>com.google.api.services.bigquery.model.TableRow</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>java.math.BigDecimal</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>java.nio.charset.StandardCharsets</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>java.time.Instant</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>java.time.LocalDate</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>java.time.LocalDateTime</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>java.time.LocalTime</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>java.util.AbstractMap.SimpleEntry</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>java.util.Arrays</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>java.util.Base64</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>java.util.stream.Collectors</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>java.util.stream.Stream</span><span class=o>;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=kd>class</span> <span class=nc>BigQueryTableRowCreate</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kd>static</span> <span class=n>TableRow</span> <span class=nf>createTableRow</span><span class=o>()</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=n>TableRow</span> <span class=n>row</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableRow</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=c1>// To learn more about BigQuery data types:
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;string_field&#34;</span><span class=o>,</span> <span class=s>&#34;UTF-8 strings are supported! 🌱🌳🌍&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;int64_field&#34;</span><span class=o>,</span> <span class=n>432</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;float64_field&#34;</span><span class=o>,</span> <span class=n>3</span><span class=o>.</span><span class=na>141592653589793</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;numeric_field&#34;</span><span class=o>,</span> <span class=k>new</span> <span class=n>BigDecimal</span><span class=o>(</span><span class=s>&#34;1234.56&#34;</span><span class=o>).</span><span class=na>toString</span><span class=o>())</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;bool_field&#34;</span><span class=o>,</span> <span class=kc>true</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>set</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;bytes_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>Base64</span><span class=o>.</span><span class=na>getEncoder</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>encodeToString</span><span class=o>(</span><span class=s>&#34;UTF-8 byte string 🌱🌳🌍&#34;</span><span class=o>.</span><span class=na>getBytes</span><span class=o>(</span><span class=n>StandardCharsets</span><span class=o>.</span><span class=na>UTF_8</span><span class=o>)))</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=c1>// To learn more about date formatting:
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;date_field&#34;</span><span class=o>,</span> <span class=n>LocalDate</span><span class=o>.</span><span class=na>parse</span><span class=o>(</span><span class=s>&#34;2020-03-19&#34;</span><span class=o>).</span><span class=na>toString</span><span class=o>())</span> <span class=c1>// ISO_LOCAL_DATE
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>.</span><span class=na>set</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;datetime_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>LocalDateTime</span><span class=o>.</span><span class=na>parse</span><span class=o>(</span><span class=s>&#34;2020-03-19T20:41:25.123&#34;</span><span class=o>).</span><span class=na>toString</span><span class=o>())</span> <span class=c1>// ISO_LOCAL_DATE_TIME
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;time_field&#34;</span><span class=o>,</span> <span class=n>LocalTime</span><span class=o>.</span><span class=na>parse</span><span class=o>(</span><span class=s>&#34;20:41:25.123&#34;</span><span class=o>).</span><span class=na>toString</span><span class=o>())</span> <span class=c1>// ISO_LOCAL_TIME
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>.</span><span class=na>set</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;timestamp_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>Instant</span><span class=o>.</span><span class=na>parse</span><span class=o>(</span><span class=s>&#34;2020-03-20T03:41:42.123Z&#34;</span><span class=o>).</span><span class=na>toString</span><span class=o>())</span> <span class=c1>// ISO_INSTANT
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl> <span class=c1>// To learn more about the geography Well-Known Text (WKT) format:
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;geography_field&#34;</span><span class=o>,</span> <span class=s>&#34;POINT(30 10)&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=c1>// An array has its mode set to REPEATED.
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;array_field&#34;</span><span class=o>,</span> <span class=n>Arrays</span><span class=o>.</span><span class=na>asList</span><span class=o>(</span><span class=n>1</span><span class=o>,</span> <span class=n>2</span><span class=o>,</span> <span class=n>3</span><span class=o>,</span> <span class=n>4</span><span class=o>))</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=c1>// Any class can be written as a STRUCT as long as all the fields in the
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// schema are present and they are encoded correctly as BigQuery types.
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>.</span><span class=na>set</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;struct_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>Stream</span><span class=o>.</span><span class=na>of</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>SimpleEntry</span><span class=o>&lt;&gt;(</span><span class=s>&#34;string_value&#34;</span><span class=o>,</span> <span class=s>&#34;Text 🌱🌳🌍&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>SimpleEntry</span><span class=o>&lt;&gt;(</span><span class=s>&#34;int64_value&#34;</span><span class=o>,</span> <span class=s>&#34;42&#34;</span><span class=o>))</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>collect</span><span class=o>(</span><span class=n>Collectors</span><span class=o>.</span><span class=na>toMap</span><span class=o>(</span><span class=n>SimpleEntry</span><span class=o>::</span><span class=n>getKey</span><span class=o>,</span> <span class=n>SimpleEntry</span><span class=o>::</span><span class=n>getValue</span><span class=o>)));</span>
</span></span><span class=line><span class=cl> <span class=k>return</span> <span class=n>row</span><span class=o>;</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>bigquery_data</span> <span class=o>=</span> <span class=p>[{</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;string&#39;</span><span class=p>:</span> <span class=s1>&#39;abc&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;bytes&#39;</span><span class=p>:</span> <span class=n>base64</span><span class=o>.</span><span class=n>b64encode</span><span class=p>(</span><span class=sa>b</span><span class=s1>&#39;</span><span class=se>\xab\xac</span><span class=s1>&#39;</span><span class=p>),</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;integer&#39;</span><span class=p>:</span> <span class=mi>5</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;float&#39;</span><span class=p>:</span> <span class=mf>0.5</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;numeric&#39;</span><span class=p>:</span> <span class=n>Decimal</span><span class=p>(</span><span class=s1>&#39;5&#39;</span><span class=p>),</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;boolean&#39;</span><span class=p>:</span> <span class=kc>True</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;timestamp&#39;</span><span class=p>:</span> <span class=s1>&#39;2018-12-31 12:44:31.744957 UTC&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;date&#39;</span><span class=p>:</span> <span class=s1>&#39;2018-12-31&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;time&#39;</span><span class=p>:</span> <span class=s1>&#39;12:44:31&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;datetime&#39;</span><span class=p>:</span> <span class=s1>&#39;2018-12-31T12:44:31&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;geography&#39;</span><span class=p>:</span> <span class=s1>&#39;POINT(30 10)&#39;</span>
</span></span><span class=line><span class=cl><span class=p>}]</span></span></span></code></pre></div></div></div><p class=language-java>As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports
high-precision decimal numbers (precision of 38 digits, scale of 9 digits).
The GEOGRAPHY data type works with Well-Known Text (See <a href=https://en.wikipedia.org/wiki/Well-known_text>https://en.wikipedia.org/wiki/Well-known_text</a>
format for reading and writing to BigQuery.
BigQuery IO requires values of BYTES datatype to be encoded using base64
encoding when writing to BigQuery. When bytes are read from BigQuery they are
returned as base64-encoded strings.</p><p class=language-py>As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports
high-precision decimal numbers (precision of 38 digits, scale of 9 digits).
The GEOGRAPHY data type works with Well-Known Text (See <a href=https://en.wikipedia.org/wiki/Well-known_text>https://en.wikipedia.org/wiki/Well-known_text</a>
format for reading and writing to BigQuery.
BigQuery IO requires values of BYTES datatype to be encoded using base64
encoding when writing to BigQuery. When bytes are read from BigQuery they are
returned as base64-encoded bytes.</p><h2 id=reading-from-bigquery>Reading from BigQuery</h2><p>BigQueryIO allows you to read from a BigQuery table, or to execute a SQL query
and read the results. By default, Beam invokes a <a href=https://cloud.google.com/bigquery/docs/exporting-data>BigQuery export
request</a> when you apply a
BigQueryIO read transform. However, the Beam SDK for Java also supports using
the <a href=https://cloud.google.com/bigquery/docs/reference/storage>BigQuery Storage Read
API</a> to read directly
from BigQuery storage. See <a href=#storage-api>Using the Storage Read API</a> for
more information.</p><blockquote><p>Beam’s use of BigQuery APIs is subject to BigQuery&rsquo;s
<a href=https://cloud.google.com/bigquery/quota-policy>Quota</a>
and <a href=https://cloud.google.com/bigquery/pricing>Pricing</a> policies.</p></blockquote><p class=language-java>The Beam SDK for Java has two BigQueryIO read methods. Both of these methods
allow you to read from a table, or read fields using a query string.</p><span class=language-java><ol><li><p><code>read(SerializableFunction)</code> reads Avro-formatted records and uses a
specified parsing function to parse them into a <code>PCollection</code> of custom typed
objects. Each element in the <code>PCollection</code> represents a single row in the
table. The <a href=#reading-with-a-query-string>example code</a> for reading with a
query string shows how to use <code>read(SerializableFunction)</code>.</p></li><li><p><code>readTableRows</code> returns a <code>PCollection</code> of BigQuery <code>TableRow</code>
objects. Each element in the <code>PCollection</code> represents a single row in the
table. Integer values in the <code>TableRow</code> objects are encoded as strings to
match BigQuery&rsquo;s exported JSON format. This method is convenient, but can be
2-3 times slower in performance compared to <code>read(SerializableFunction)</code>. The
<a href=#reading-from-a-table>example code</a> for reading from a table shows how to
use <code>readTableRows</code>.</p></li></ol></span><p class=language-java><em><strong>Note:</strong></em> <code>BigQueryIO.read()</code> is deprecated as of Beam SDK 2.2.0. Instead, use
<code>read(SerializableFunction&lt;SchemaAndRecord, T>)</code> to parse BigQuery rows from
Avro <code>GenericRecord</code> into your custom type, or use <code>readTableRows()</code> to parse
them into JSON <code>TableRow</code> objects.</p><p class=language-py>To read from a BigQuery table using the Beam SDK for Python, apply a <code>ReadFromBigQuery</code>
transform. <code>ReadFromBigQuery</code> returns a <code>PCollection</code> of dictionaries,
where each element in the <code>PCollection</code> represents a single row in the table.
Integer values in the <code>TableRow</code> objects are encoded as strings to match
BigQuery&rsquo;s exported JSON format.</p><p class=language-py><em><strong>Note:</strong></em> <code>BigQuerySource()</code> is deprecated as of Beam SDK 2.25.0. Before 2.25.0, to read from
a BigQuery table using the Beam SDK, apply a <code>Read</code> transform on a <code>BigQuerySource</code>. For example,
<code>beam.io.Read(beam.io.BigQuerySource(table_spec))</code>.</p><h3 id=reading-from-a-table>Reading from a table</h3><p class=language-java>To read an entire BigQuery table, use the <code>from</code> method with a BigQuery table
name. This example uses <code>readTableRows</code>.</p><p class=language-py>To read an entire BigQuery table, use the <code>table</code> parameter with the BigQuery
table name.</p><p>The following code reads an entire table that contains weather station data and
then extracts the <code>max_temperature</code> column.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.Pipeline</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.transforms.MapElements</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.values.PCollection</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.values.TypeDescriptor</span><span class=o>;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=kd>class</span> <span class=nc>BigQueryReadFromTable</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kd>static</span> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>MyData</span><span class=o>&gt;</span> <span class=nf>readFromTable</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>String</span> <span class=n>project</span><span class=o>,</span> <span class=n>String</span> <span class=n>dataset</span><span class=o>,</span> <span class=n>String</span> <span class=n>table</span><span class=o>,</span> <span class=n>Pipeline</span> <span class=n>pipeline</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=c1>// String project = &#34;my-project-id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// String dataset = &#34;my_bigquery_dataset_id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// String table = &#34;my_bigquery_table_id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl> <span class=c1>// Pipeline pipeline = Pipeline.create();
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>MyData</span><span class=o>&gt;</span> <span class=n>rows</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=n>pipeline</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;Read from BigQuery query&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>BigQueryIO</span><span class=o>.</span><span class=na>readTableRows</span><span class=o>().</span><span class=na>from</span><span class=o>(</span><span class=n>String</span><span class=o>.</span><span class=na>format</span><span class=o>(</span><span class=s>&#34;%s:%s.%s&#34;</span><span class=o>,</span> <span class=n>project</span><span class=o>,</span> <span class=n>dataset</span><span class=o>,</span> <span class=n>table</span><span class=o>)))</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;TableRows to MyData&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>MapElements</span><span class=o>.</span><span class=na>into</span><span class=o>(</span><span class=n>TypeDescriptor</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>MyData</span><span class=o>.</span><span class=na>class</span><span class=o>)).</span><span class=na>via</span><span class=o>(</span><span class=n>MyData</span><span class=o>::</span><span class=n>fromTableRow</span><span class=o>));</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=k>return</span> <span class=n>rows</span><span class=o>;</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>max_temperatures</span> <span class=o>=</span> <span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>pipeline</span>
</span></span><span class=line><span class=cl> <span class=o>|</span> <span class=s1>&#39;ReadTable&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>ReadFromBigQuery</span><span class=p>(</span><span class=n>table</span><span class=o>=</span><span class=n>table_spec</span><span class=p>)</span>
</span></span><span class=line><span class=cl> <span class=c1># Each row is a dictionary where the keys are the BigQuery columns</span>
</span></span><span class=line><span class=cl> <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>Map</span><span class=p>(</span><span class=k>lambda</span> <span class=n>elem</span><span class=p>:</span> <span class=n>elem</span><span class=p>[</span><span class=s1>&#39;max_temperature&#39;</span><span class=p>]))</span></span></span></code></pre></div></div></div><h3 id=reading-with-a-query-string>Reading with a query string</h3><p class=language-java>If you don&rsquo;t want to read an entire table, you can supply a query string with
the <code>fromQuery</code> method.</p><p class=language-py>If you don&rsquo;t want to read an entire table, you can supply a query string to
<code>ReadFromBigQuery</code> by specifying the <code>query</code> parameter.</p><p class=language-py>The following code uses a SQL query to only read the <code>max_temperature</code> column.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.Pipeline</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.transforms.MapElements</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.values.PCollection</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.values.TypeDescriptor</span><span class=o>;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=kd>class</span> <span class=nc>BigQueryReadFromQuery</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kd>static</span> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>MyData</span><span class=o>&gt;</span> <span class=nf>readFromQuery</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>String</span> <span class=n>project</span><span class=o>,</span> <span class=n>String</span> <span class=n>dataset</span><span class=o>,</span> <span class=n>String</span> <span class=n>table</span><span class=o>,</span> <span class=n>Pipeline</span> <span class=n>pipeline</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=c1>// String project = &#34;my-project-id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// String dataset = &#34;my_bigquery_dataset_id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// String table = &#34;my_bigquery_table_id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl> <span class=c1>// Pipeline pipeline = Pipeline.create();
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>MyData</span><span class=o>&gt;</span> <span class=n>rows</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=n>pipeline</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;Read from BigQuery query&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>BigQueryIO</span><span class=o>.</span><span class=na>readTableRows</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>fromQuery</span><span class=o>(</span><span class=n>String</span><span class=o>.</span><span class=na>format</span><span class=o>(</span><span class=s>&#34;SELECT * FROM `%s.%s.%s`&#34;</span><span class=o>,</span> <span class=n>project</span><span class=o>,</span> <span class=n>dataset</span><span class=o>,</span> <span class=n>table</span><span class=o>))</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>usingStandardSql</span><span class=o>())</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;TableRows to MyData&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>MapElements</span><span class=o>.</span><span class=na>into</span><span class=o>(</span><span class=n>TypeDescriptor</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>MyData</span><span class=o>.</span><span class=na>class</span><span class=o>)).</span><span class=na>via</span><span class=o>(</span><span class=n>MyData</span><span class=o>::</span><span class=n>fromTableRow</span><span class=o>));</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=k>return</span> <span class=n>rows</span><span class=o>;</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>max_temperatures</span> <span class=o>=</span> <span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>pipeline</span>
</span></span><span class=line><span class=cl> <span class=o>|</span> <span class=s1>&#39;QueryTable&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>ReadFromBigQuery</span><span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>query</span><span class=o>=</span><span class=s1>&#39;SELECT max_temperature FROM &#39;</span>\
</span></span><span class=line><span class=cl> <span class=s1>&#39;[apache-beam-testing.samples.weather_stations]&#39;</span><span class=p>)</span>
</span></span><span class=line><span class=cl> <span class=c1># Each row is a dictionary where the keys are the BigQuery columns</span>
</span></span><span class=line><span class=cl> <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>Map</span><span class=p>(</span><span class=k>lambda</span> <span class=n>elem</span><span class=p>:</span> <span class=n>elem</span><span class=p>[</span><span class=s1>&#39;max_temperature&#39;</span><span class=p>]))</span></span></span></code></pre></div></div></div><p>You can also use BigQuery&rsquo;s standard SQL dialect with a query string, as shown
in the following example:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>Double</span><span class=o>&gt;</span> <span class=n>maxTemperatures</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=n>p</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>BigQueryIO</span><span class=o>.</span><span class=na>read</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=o>(</span><span class=n>SchemaAndRecord</span> <span class=n>elem</span><span class=o>)</span> <span class=o>-&gt;</span> <span class=o>(</span><span class=n>Double</span><span class=o>)</span> <span class=n>elem</span><span class=o>.</span><span class=na>getRecord</span><span class=o>().</span><span class=na>get</span><span class=o>(</span><span class=s>&#34;max_temperature&#34;</span><span class=o>))</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>fromQuery</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>usingStandardSql</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withCoder</span><span class=o>(</span><span class=n>DoubleCoder</span><span class=o>.</span><span class=na>of</span><span class=o>()));</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>max_temperatures</span> <span class=o>=</span> <span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>pipeline</span>
</span></span><span class=line><span class=cl> <span class=o>|</span> <span class=s1>&#39;QueryTableStdSQL&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>ReadFromBigQuery</span><span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>query</span><span class=o>=</span><span class=s1>&#39;SELECT max_temperature FROM &#39;</span>\
</span></span><span class=line><span class=cl> <span class=s1>&#39;`clouddataflow-readonly.samples.weather_stations`&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>use_standard_sql</span><span class=o>=</span><span class=kc>True</span><span class=p>)</span>
</span></span><span class=line><span class=cl> <span class=c1># Each row is a dictionary where the keys are the BigQuery columns</span>
</span></span><span class=line><span class=cl> <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>Map</span><span class=p>(</span><span class=k>lambda</span> <span class=n>elem</span><span class=p>:</span> <span class=n>elem</span><span class=p>[</span><span class=s1>&#39;max_temperature&#39;</span><span class=p>]))</span></span></span></code></pre></div></div></div><p class=language-java><h4 id=query-execution-project>Query execution project</h4><p>By default the pipeline executes the query in the Google Cloud project associated with the pipeline (in case of the Dataflow runner it&rsquo;s the project where the pipeline runs). There are cases where the query execution project should be different from the pipeline project. If you use Java SDK, you can define the query execution project by setting the pipeline option &ldquo;<a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.html#getBigQueryProject-->bigQueryProject</a>&rdquo; to the desired Google Cloud project id.</p><h3 id=storage-api>Using the Storage Read API</h3><p>The <a href=https://cloud.google.com/bigquery/docs/reference/storage/>BigQuery Storage API</a>
allows you to directly access tables in BigQuery storage, and supports features
such as column selection and predicate filter push-down which can allow more
efficient pipeline execution.</p><p>The Beam SDK for Java supports using the BigQuery Storage API when reading from
BigQuery. SDK versions before 2.25.0 support the BigQuery Storage API as an
<a href=https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/annotations/Experimental.html>experimental feature</a>
and use the pre-GA BigQuery Storage API surface. Callers should migrate
pipelines which use the BigQuery Storage API to use SDK version 2.25.0 or later.</p><p>The Beam SDK for Python supports the BigQuery Storage API. Enable it
by passing <code>method=DIRECT_READ</code> as a parameter to <code>ReadFromBigQuery</code>.</p><h4 id=updating-your-code>Updating your code</h4><p>Use the following methods when you read from a table:</p><ul><li>Required: Specify <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.TypedRead.html#withMethod-org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method->withMethod(Method.DIRECT_READ)</a> to use the BigQuery Storage API for the read operation.</li><li>Optional: To use features such as <a href=https://cloud.google.com/bigquery/docs/reference/storage/>column projection and column filtering</a>, you must specify <a href=https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.TypedRead.html#withSelectedFields-java.util.List->withSelectedFields</a> and <a href=https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.TypedRead.html#withRowRestriction-java.lang.String->withRowRestriction</a> respectively.</li></ul><p>The following code snippet reads from a table. This example is from the <a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java>BigQueryTornadoes
example</a>.
When the example&rsquo;s read method option is set to <code>DIRECT_READ</code>, the pipeline uses
the BigQuery Storage API and column projection to read public samples of weather
data from a BigQuery table. You can view the <a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java>full source code on
GitHub</a>.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kn>import</span> <span class=nn>java.util.Arrays</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.Pipeline</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.transforms.MapElements</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.values.PCollection</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.values.TypeDescriptor</span><span class=o>;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=kd>class</span> <span class=nc>BigQueryReadFromTableWithBigQueryStorageAPI</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kd>static</span> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>MyData</span><span class=o>&gt;</span> <span class=nf>readFromTableWithBigQueryStorageAPI</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>String</span> <span class=n>project</span><span class=o>,</span> <span class=n>String</span> <span class=n>dataset</span><span class=o>,</span> <span class=n>String</span> <span class=n>table</span><span class=o>,</span> <span class=n>Pipeline</span> <span class=n>pipeline</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=c1>// String project = &#34;my-project-id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// String dataset = &#34;my_bigquery_dataset_id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// String table = &#34;my_bigquery_table_id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl> <span class=c1>// Pipeline pipeline = Pipeline.create();
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>MyData</span><span class=o>&gt;</span> <span class=n>rows</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=n>pipeline</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;Read from BigQuery table&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>BigQueryIO</span><span class=o>.</span><span class=na>readTableRows</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>from</span><span class=o>(</span><span class=n>String</span><span class=o>.</span><span class=na>format</span><span class=o>(</span><span class=s>&#34;%s:%s.%s&#34;</span><span class=o>,</span> <span class=n>project</span><span class=o>,</span> <span class=n>dataset</span><span class=o>,</span> <span class=n>table</span><span class=o>))</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withMethod</span><span class=o>(</span><span class=n>Method</span><span class=o>.</span><span class=na>DIRECT_READ</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withSelectedFields</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>Arrays</span><span class=o>.</span><span class=na>asList</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;string_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;int64_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;float64_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;numeric_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;bool_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;bytes_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;date_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;datetime_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;time_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;timestamp_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;geography_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;array_field&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;struct_field&#34;</span><span class=o>)))</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;TableRows to MyData&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>MapElements</span><span class=o>.</span><span class=na>into</span><span class=o>(</span><span class=n>TypeDescriptor</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>MyData</span><span class=o>.</span><span class=na>class</span><span class=o>)).</span><span class=na>via</span><span class=o>(</span><span class=n>MyData</span><span class=o>::</span><span class=n>fromTableRow</span><span class=o>));</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=k>return</span> <span class=n>rows</span><span class=o>;</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>max_temperatures</span> <span class=o>=</span> <span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>pipeline</span>
</span></span><span class=line><span class=cl> <span class=o>|</span> <span class=s1>&#39;ReadTableWithStorageAPI&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>ReadFromBigQuery</span><span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>table</span><span class=o>=</span><span class=n>table_spec</span><span class=p>,</span> <span class=n>method</span><span class=o>=</span><span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>ReadFromBigQuery</span><span class=o>.</span><span class=n>Method</span><span class=o>.</span><span class=n>DIRECT_READ</span><span class=p>)</span>
</span></span><span class=line><span class=cl> <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>Map</span><span class=p>(</span><span class=k>lambda</span> <span class=n>elem</span><span class=p>:</span> <span class=n>elem</span><span class=p>[</span><span class=s1>&#39;max_temperature&#39;</span><span class=p>]))</span></span></span></code></pre></div></div></div><p>The following code snippet reads with a query string.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.Pipeline</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.transforms.MapElements</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.values.PCollection</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.values.TypeDescriptor</span><span class=o>;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=kd>class</span> <span class=nc>BigQueryReadFromQueryWithBigQueryStorageAPI</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kd>static</span> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>MyData</span><span class=o>&gt;</span> <span class=nf>readFromQueryWithBigQueryStorageAPI</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>String</span> <span class=n>project</span><span class=o>,</span> <span class=n>String</span> <span class=n>dataset</span><span class=o>,</span> <span class=n>String</span> <span class=n>table</span><span class=o>,</span> <span class=n>String</span> <span class=n>query</span><span class=o>,</span> <span class=n>Pipeline</span> <span class=n>pipeline</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=c1>// String project = &#34;my-project-id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// String dataset = &#34;my_bigquery_dataset_id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// String table = &#34;my_bigquery_table_id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl> <span class=c1>// Pipeline pipeline = Pipeline.create();
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl> <span class=cm>/*
</span></span></span><span class=line><span class=cl><span class=cm> String query = String.format(&#34;SELECT\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; string_field,\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; int64_field,\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; float64_field,\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; numeric_field,\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; bool_field,\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; bytes_field,\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; date_field,\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; datetime_field,\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; time_field,\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; timestamp_field,\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; geography_field,\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; array_field,\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; struct_field\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34;FROM\n&#34; +
</span></span></span><span class=line><span class=cl><span class=cm> &#34; `%s:%s.%s`&#34;, project, dataset, table)
</span></span></span><span class=line><span class=cl><span class=cm> */</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>MyData</span><span class=o>&gt;</span> <span class=n>rows</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=n>pipeline</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;Read from BigQuery table&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>BigQueryIO</span><span class=o>.</span><span class=na>readTableRows</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>fromQuery</span><span class=o>(</span><span class=n>query</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>usingStandardSql</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withMethod</span><span class=o>(</span><span class=n>Method</span><span class=o>.</span><span class=na>DIRECT_READ</span><span class=o>))</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;TableRows to MyData&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>MapElements</span><span class=o>.</span><span class=na>into</span><span class=o>(</span><span class=n>TypeDescriptor</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>MyData</span><span class=o>.</span><span class=na>class</span><span class=o>)).</span><span class=na>via</span><span class=o>(</span><span class=n>MyData</span><span class=o>::</span><span class=n>fromTableRow</span><span class=o>));</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=k>return</span> <span class=n>rows</span><span class=o>;</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=c1># The SDK for Python does not support the BigQuery Storage API.</span></span></span></code></pre></div></div></div><h2 id=writing-to-bigquery>Writing to BigQuery</h2><p>BigQueryIO lets you write to BigQuery tables. If you are using the Beam SDK
for Java, you can write different rows to different tables. The Beam SDK for
Java also supports using the
<a href=https://cloud.google.com/bigquery/docs/write-api>BigQuery Storage Write API</a>
to write directly to BigQuery storage. For more information, see
<a href=#storage-write-api>Using the Storage Write API</a>.</p><blockquote><p>BigQueryIO write transforms use APIs that are subject to BigQuery&rsquo;s
<a href=https://cloud.google.com/bigquery/quota-policy>Quota</a> and
<a href=https://cloud.google.com/bigquery/pricing>Pricing</a> policies.</p></blockquote><p>When you apply a write transform, you must provide the following information
for the destination table(s):</p><ul><li>The table name.</li><li>The destination table&rsquo;s create disposition. The create disposition specifies
whether the destination table must exist or can be created by the write
operation.</li><li>The destination table&rsquo;s write disposition. The write disposition specifies
whether the data you write replaces an existing table, appends rows to an
existing table, or writes only to an empty table.</li></ul><p>In addition, if your write operation creates a new BigQuery table, you must also
supply a table schema for the destination table.</p><h3 id=create-disposition>Create disposition</h3><p>The create disposition controls whether or not your BigQuery write operation
should create a table if the destination table does not exist.</p><p class=language-java>Use <code>.withCreateDisposition</code> to specify the create disposition. Valid enum
values are:</p><span class=language-java><ul><li><p><code>Write.CreateDisposition.CREATE_IF_NEEDED</code>: Specifies that the
write operation should create a new table if one does not exist. If you use
this value, you must provide a table schema with the <code>withSchema</code> method.
<code>CREATE_IF_NEEDED</code> is the default behavior.</p></li><li><p><code>Write.CreateDisposition.CREATE_NEVER</code>: Specifies that a table
should never be created. If the destination table does not exist, the write
operation fails.</p></li></ul></span><p class=language-py>Use the <code>create_disposition</code> parameter to specify the create disposition. Valid
enum values are:</p><span class=language-py><ul><li><p><code>BigQueryDisposition.CREATE_IF_NEEDED</code>: Specifies that the write operation
should create a new table if one does not exist. If you use this value, you
must provide a table schema. <code>CREATE_IF_NEEDED</code> is the default behavior.</p></li><li><p><code>BigQueryDisposition.CREATE_NEVER</code>: Specifies that a table should never be
created. If the destination table does not exist, the write operation fails.</p></li></ul></span><p>If you specify <code>CREATE_IF_NEEDED</code> as the create disposition and you don&rsquo;t supply
a table schema, the transform might fail at runtime if the destination table does
not exist.</p><h3 id=write-disposition>Write disposition</h3><p>The write disposition controls how your BigQuery write operation applies to an
existing table.</p><p class=language-java>Use <code>.withWriteDisposition</code> to specify the write disposition. Valid enum values
are:</p><span class=language-java><ul><li><p><code>Write.WriteDisposition.WRITE_EMPTY</code>: Specifies that the write
operation should fail at runtime if the destination table is not empty.
<code>WRITE_EMPTY</code> is the default behavior.</p></li><li><p><code>Write.WriteDisposition.WRITE_TRUNCATE</code>: Specifies that the write
operation should replace an existing table. Any existing rows in the
destination table are removed, and the new rows are added to the table.</p></li><li><p><code>Write.WriteDisposition.WRITE_APPEND</code>: Specifies that the write
operation should append the rows to the end of the existing table.</p></li></ul></span><p class=language-py>Use the <code>write_disposition</code> parameter to specify the write disposition. Valid
enum values are:</p><span class=language-py><ul><li><p><code>BigQueryDisposition.WRITE_EMPTY</code>: Specifies that the write operation should
fail at runtime if the destination table is not empty. <code>WRITE_EMPTY</code> is the
default behavior.</p></li><li><p><code>BigQueryDisposition.WRITE_TRUNCATE</code>: Specifies that the write operation
should replace an existing table. Any existing rows in the destination table
are removed, and the new rows are added to the table.</p></li><li><p><code>BigQueryDisposition.WRITE_APPEND</code>: Specifies that the write operation should
append the rows to the end of the existing table.</p></li></ul></span><p>When you use <code>WRITE_EMPTY</code>, the check for whether or not the destination table
is empty can occur before the actual write operation. This check doesn&rsquo;t
guarantee that your pipeline will have exclusive access to the table. Two
concurrent pipelines that write to the same output table with a write
disposition of <code>WRITE_EMPTY</code> might start successfully, but both pipelines can
fail later when the write attempts happen.</p><h3 id=creating-a-table-schema>Creating a table schema</h3><p>If your BigQuery write operation creates a new table, you must provide schema
information. The schema contains information about each field in the table.
When updating a pipeline with a new schema, the existing schema fields must
stay in the same order, or the pipeline will break, failing to write to BigQuery.</p><p class=language-java>To create a table schema in Java, you can either use a <code>TableSchema</code> object, or
use a string that contains a JSON-serialized <code>TableSchema</code> object.</p><p class=language-py>To create a table schema in Python, you can either use a <code>TableSchema</code> object,
or use a string that defines a list of fields. Single string based schemas do
not support nested fields, repeated fields, or specifying a BigQuery mode for
fields (the mode is always set to <code>NULLABLE</code>).</p><h4 id=using-a-tableschema>Using a TableSchema</h4><p>To create and use a table schema as a <code>TableSchema</code> object, follow these steps.</p><span class=language-java><ol><li><p>Create a list of <code>TableFieldSchema</code> objects. Each <code>TableFieldSchema</code> object
represents a field in the table.</p></li><li><p>Create a <code>TableSchema</code> object and use the <code>setFields</code> method to specify your
list of fields.</p></li><li><p>Use the <code>withSchema</code> method to provide your table schema when you apply a
write transform.</p></li></ol></span><span class=language-py><ol><li><p>Create a <code>TableSchema</code> object.</p></li><li><p>Create and append a <code>TableFieldSchema</code> object for each field in your table.</p></li><li><p>Use the <code>schema</code> parameter to provide your table schema when you apply
a write transform. Set the parameter’s value to the <code>TableSchema</code> object.</p></li></ol></span><p>The following example code shows how to create a <code>TableSchema</code> for a table with
two fields (source and quote) of type string.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kn>import</span> <span class=nn>com.google.api.services.bigquery.model.TableFieldSchema</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>com.google.api.services.bigquery.model.TableSchema</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>java.util.Arrays</span><span class=o>;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=kd>class</span> <span class=nc>BigQuerySchemaCreate</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kd>static</span> <span class=n>TableSchema</span> <span class=nf>createSchema</span><span class=o>()</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=c1>// To learn more about BigQuery schemas:
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// https://cloud.google.com/bigquery/docs/schemas
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=n>TableSchema</span> <span class=n>schema</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableSchema</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setFields</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>Arrays</span><span class=o>.</span><span class=na>asList</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;string_field&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;STRING&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setMode</span><span class=o>(</span><span class=s>&#34;REQUIRED&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;int64_field&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;INT64&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setMode</span><span class=o>(</span><span class=s>&#34;NULLABLE&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;float64_field&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;FLOAT64&#34;</span><span class=o>),</span> <span class=c1>// default mode is &#34;NULLABLE&#34;
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>().</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;numeric_field&#34;</span><span class=o>).</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;NUMERIC&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>().</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;bool_field&#34;</span><span class=o>).</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;BOOL&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>().</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;bytes_field&#34;</span><span class=o>).</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;BYTES&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>().</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;date_field&#34;</span><span class=o>).</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;DATE&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>().</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;datetime_field&#34;</span><span class=o>).</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;DATETIME&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>().</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;time_field&#34;</span><span class=o>).</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;TIME&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>().</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;timestamp_field&#34;</span><span class=o>).</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;TIMESTAMP&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>().</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;geography_field&#34;</span><span class=o>).</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;GEOGRAPHY&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;array_field&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;INT64&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setMode</span><span class=o>(</span><span class=s>&#34;REPEATED&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setDescription</span><span class=o>(</span><span class=s>&#34;Setting the mode to REPEATED makes this an ARRAY&lt;INT64&gt;.&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;struct_field&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;STRUCT&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setDescription</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;A STRUCT accepts a custom data class, the fields must match the custom class fields.&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setFields</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>Arrays</span><span class=o>.</span><span class=na>asList</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>().</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;string_value&#34;</span><span class=o>).</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;STRING&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>().</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;int64_value&#34;</span><span class=o>).</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;INT64&#34;</span><span class=o>)))));</span>
</span></span><span class=line><span class=cl> <span class=k>return</span> <span class=n>schema</span><span class=o>;</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>table_schema</span> <span class=o>=</span> <span class=p>{</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;fields&#39;</span><span class=p>:</span> <span class=p>[{</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;name&#39;</span><span class=p>:</span> <span class=s1>&#39;source&#39;</span><span class=p>,</span> <span class=s1>&#39;type&#39;</span><span class=p>:</span> <span class=s1>&#39;STRING&#39;</span><span class=p>,</span> <span class=s1>&#39;mode&#39;</span><span class=p>:</span> <span class=s1>&#39;NULLABLE&#39;</span>
</span></span><span class=line><span class=cl> <span class=p>},</span> <span class=p>{</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;name&#39;</span><span class=p>:</span> <span class=s1>&#39;quote&#39;</span><span class=p>,</span> <span class=s1>&#39;type&#39;</span><span class=p>:</span> <span class=s1>&#39;STRING&#39;</span><span class=p>,</span> <span class=s1>&#39;mode&#39;</span><span class=p>:</span> <span class=s1>&#39;REQUIRED&#39;</span>
</span></span><span class=line><span class=cl> <span class=p>}]</span>
</span></span><span class=line><span class=cl><span class=p>}</span></span></span></code></pre></div></div></div><h4 id=using-a-string-1>Using a string</h4><p class=language-java>To create and use a table schema as a string that contains JSON-serialized
<code>TableSchema</code> object, follow these steps.</p><span class=language-java><ol><li><p>Create a string that contains a JSON-serialized <code>TableSchema</code> object.</p></li><li><p>Use the <code>withJsonSchema</code> method to provide your table schema when you apply a
write transform.</p></li></ol></span><p class=language-py>To create and use a table schema as a string, follow these steps.</p><span class=language-py><ol><li><p>Create a single comma separated string of the form
&ldquo;field1:type1,field2:type2,field3:type3&rdquo; that defines a list of fields. The
type should specify the field’s BigQuery type.</p></li><li><p>Use the <code>schema</code> parameter to provide your table schema when you apply a
write transform. Set the parameter’s value to the string.</p></li></ol></span><p>The following example shows how to use a string to specify the same table schema
as the previous example.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>String</span> <span class=n>tableSchemaJson</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34;{&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34; \&#34;fields\&#34;: [&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34; {&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34; \&#34;name\&#34;: \&#34;source\&#34;,&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34; \&#34;type\&#34;: \&#34;STRING\&#34;,&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34; \&#34;mode\&#34;: \&#34;NULLABLE\&#34;&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34; },&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34; {&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34; \&#34;name\&#34;: \&#34;quote\&#34;,&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34; \&#34;type\&#34;: \&#34;STRING\&#34;,&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34; \&#34;mode\&#34;: \&#34;REQUIRED\&#34;&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34; }&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34; ]&#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34;}&#34;</span><span class=o>;</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=c1># column_name:BIGQUERY_TYPE, ...</span>
</span></span><span class=line><span class=cl><span class=n>table_schema</span> <span class=o>=</span> <span class=s1>&#39;source:STRING, quote:STRING&#39;</span></span></span></code></pre></div></div></div><h3 id=setting-the-insertion-method>Setting the insertion method</h3><p>BigQueryIO supports two methods of inserting data into BigQuery: load jobs and
streaming inserts. Each insertion method provides different tradeoffs of cost,
quota, and data consistency. See the BigQuery documentation for
<a href=https://cloud.google.com/bigquery/loading-data>different data ingestion options</a>
(specifically, <a href=https://cloud.google.com/bigquery/docs/batch-loading-data>load jobs</a>
and <a href=https://cloud.google.com/bigquery/streaming-data-into-bigquery>streaming inserts</a>)
for more information about these tradeoffs.</p><p class=language-java>BigQueryIO chooses a default insertion method based on the input <code>PCollection</code>.
You can use <code>withMethod</code> to specify the desired insertion method. See
<a href=https://beam.apache.org/releases/javadoc/2.55.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.Method.html><code>Write.Method</code></a>
for the list of the available methods and their restrictions.</p><p class=language-py>BigQueryIO chooses a default insertion method based on the input <code>PCollection</code>.
You can use <code>method</code> to specify the desired insertion method. See
<a href=https://beam.apache.org/releases/pydoc/2.55.0/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery><code>WriteToBigQuery</code></a>
for the list of the available methods and their restrictions.</p><p>BigQueryIO uses load jobs in the following situations:</p><span class=language-java><ul><li>When you apply a BigQueryIO write transform to a bounded <code>PCollection</code>.</li><li>When you specify load jobs as the insertion method using
<code>BigQueryIO.write().withMethod(FILE_LOADS)</code>.</li></ul></span><span class=language-py><ul><li>When you apply a BigQueryIO write transform to a bounded <code>PCollection</code>.</li><li>When you specify load jobs as the insertion method using
<code>WriteToBigQuery(method='FILE_LOADS')</code>.</li></ul></span><p><em><strong>Note:</strong></em> If you use batch loads in a streaming pipeline:</p><p class=language-java>You must use <code>withTriggeringFrequency</code> to specify a triggering frequency for
initiating load jobs. Be careful about setting the frequency such that your
pipeline doesn&rsquo;t exceed the BigQuery load job <a href=https://cloud.google.com/bigquery/quotas#load_jobs>quota limit</a>.</p><p class=language-java>You can either use <code>withNumFileShards</code> to explicitly set the number of file
shards written, or use <code>withAutoSharding</code> to enable dynamic sharding (starting
2.29.0 release) and the number of shards may be determined and changed at
runtime. The sharding behavior depends on the runners.</p><p class=language-py>You must use <code>triggering_frequency</code> to specify a triggering frequency for
initiating load jobs. Be careful about setting the frequency such that your
pipeline doesn&rsquo;t exceed the BigQuery load job <a href=https://cloud.google.com/bigquery/quotas#load_jobs>quota limit</a>.</p><p class=language-py>You can set <code>with_auto_sharding=True</code> to enable dynamic sharding (starting
2.29.0 release). The number of shards may be determined and changed at runtime.
The sharding behavior depends on the runners.</p><p>BigQueryIO uses streaming inserts in the following situations:</p><span class=language-java><ul><li>When you apply a BigQueryIO write transform to an unbounded <code>PCollection</code>.</li><li>When you specify streaming inserts as the insertion method using
<code>BigQueryIO.write().withMethod(STREAMING_INSERTS)</code>.</li></ul></span><span class=language-py><ul><li>When you apply a BigQueryIO write transform to an unbounded <code>PCollection</code>.</li><li>When you specify streaming inserts as the insertion method using
<code>WriteToBigQuery(method='STREAMING_INSERTS')</code>.</li></ul></span><span class=language-java><p><em><strong>Note:</strong></em> Streaming inserts by default enables BigQuery <a href=https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency>best-effort deduplication mechanism</a>.
You can disable that by setting <code>ignoreInsertIds</code>. The <a href=https://cloud.google.com/bigquery/quotas#streaming_inserts>quota limitations</a>
are different when deduplication is enabled vs. disabled.</p><p>Streaming inserts applies a default sharding for each table destination. You can
use <code>withAutoSharding</code> (starting 2.28.0 release) to enable dynamic sharding and
the number of shards may be determined and changed at runtime. The sharding
behavior depends on the runners.</p></span><span class=language-py><p><em><strong>Note:</strong></em> Streaming inserts by default enables BigQuery <a href=https://cloud.google.com/bigquery/streaming-data-into-bigquery#disabling_best_effort_de-duplication>best-effort deduplication mechanism</a>.
You can disable that by setting <code>ignore_insert_ids=True</code>. The <a href=https://cloud.google.com/bigquery/quotas#streaming_inserts>quota limitations</a>
are different when deduplication is enabled vs. disabled.</p><p>Streaming inserts applies a default sharding for each table destination. You can
set <code>with_auto_sharding=True</code> (starting 2.29.0 release) to enable dynamic
sharding. The number of shards may be determined and changed at runtime. The
sharding behavior depends on the runners.</p></span><h3 id=writing-to-a-table>Writing to a table</h3><p class=language-java>To write to a BigQuery table, apply either a <code>writeTableRows</code> or <code>write</code>
transform.</p><p class=language-py>To write to a BigQuery table, apply the <code>WriteToBigQuery</code> transform.
<code>WriteToBigQuery</code> supports both batch mode and streaming mode. You must apply
the transform to a <code>PCollection</code> of dictionaries. In general, you&rsquo;ll need to use
another transform, such as <code>ParDo</code>, to format your output data into a
collection.</p><p class=language-py>The following examples use this <code>PCollection</code> that contains quotes.</p><p class=language-java>The <code>writeTableRows</code> method writes a <code>PCollection</code> of BigQuery <code>TableRow</code>
objects to a BigQuery table. Each element in the <code>PCollection</code> represents a
single row in the table. This example uses <code>writeTableRows</code> to write elements to a
<code>PCollection&lt;TableRow></code>. The write operation creates a table if needed. If the
table already exists, it is replaced.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kn>import</span> <span class=nn>com.google.api.services.bigquery.model.TableRow</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>com.google.api.services.bigquery.model.TableSchema</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition</span><span class=o>;</span>
</span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>org.apache.beam.sdk.values.PCollection</span><span class=o>;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=kd>class</span> <span class=nc>BigQueryWriteToTable</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kd>static</span> <span class=kt>void</span> <span class=nf>writeToTable</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>String</span> <span class=n>project</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>String</span> <span class=n>dataset</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>String</span> <span class=n>table</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>TableSchema</span> <span class=n>schema</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>TableRow</span><span class=o>&gt;</span> <span class=n>rows</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=c1>// String project = &#34;my-project-id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// String dataset = &#34;my_bigquery_dataset_id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// String table = &#34;my_bigquery_table_id&#34;;
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl> <span class=c1>// TableSchema schema = new TableSchema().setFields(Arrays.asList(...));
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl> <span class=c1>// Pipeline pipeline = Pipeline.create();
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// PCollection&lt;TableRow&gt; rows = ...
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl> <span class=n>rows</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;Write to BigQuery&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=n>BigQueryIO</span><span class=o>.</span><span class=na>writeTableRows</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>to</span><span class=o>(</span><span class=n>String</span><span class=o>.</span><span class=na>format</span><span class=o>(</span><span class=s>&#34;%s:%s.%s&#34;</span><span class=o>,</span> <span class=n>project</span><span class=o>,</span> <span class=n>dataset</span><span class=o>,</span> <span class=n>table</span><span class=o>))</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withSchema</span><span class=o>(</span><span class=n>schema</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=c1>// For CreateDisposition:
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// - CREATE_IF_NEEDED (default): creates the table if it doesn&#39;t exist, a schema is
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// required
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// - CREATE_NEVER: raises an error if the table doesn&#39;t exist, a schema is not needed
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>.</span><span class=na>withCreateDisposition</span><span class=o>(</span><span class=n>CreateDisposition</span><span class=o>.</span><span class=na>CREATE_IF_NEEDED</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=c1>// For WriteDisposition:
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// - WRITE_EMPTY (default): raises an error if the table is not empty
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// - WRITE_APPEND: appends new rows to existing rows
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// - WRITE_TRUNCATE: deletes the existing rows before writing
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>.</span><span class=na>withWriteDisposition</span><span class=o>(</span><span class=n>WriteDisposition</span><span class=o>.</span><span class=na>WRITE_TRUNCATE</span><span class=o>));</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=c1>// pipeline.run().waitUntilFinish();
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>quotes</span> <span class=o>=</span> <span class=n>pipeline</span> <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>Create</span><span class=p>([</span>
</span></span><span class=line><span class=cl> <span class=p>{</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;source&#39;</span><span class=p>:</span> <span class=s1>&#39;Mahatma Gandhi&#39;</span><span class=p>,</span> <span class=s1>&#39;quote&#39;</span><span class=p>:</span> <span class=s1>&#39;My life is my message.&#39;</span>
</span></span><span class=line><span class=cl> <span class=p>},</span>
</span></span><span class=line><span class=cl> <span class=p>{</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;source&#39;</span><span class=p>:</span> <span class=s1>&#39;Yoda&#39;</span><span class=p>,</span> <span class=s1>&#39;quote&#39;</span><span class=p>:</span> <span class=s2>&#34;Do, or do not. There is no &#39;try&#39;.&#34;</span>
</span></span><span class=line><span class=cl> <span class=p>},</span>
</span></span><span class=line><span class=cl><span class=p>])</span></span></span></code></pre></div></div></div><p class=language-py>The following example code shows how to apply a <code>WriteToBigQuery</code> transform to
write a <code>PCollection</code> of dictionaries to a BigQuery table. The write operation
creates a table if needed. If the table already exists, it is replaced.</p><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>quotes</span> <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>WriteToBigQuery</span><span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>table_spec</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>schema</span><span class=o>=</span><span class=n>table_schema</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>write_disposition</span><span class=o>=</span><span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>BigQueryDisposition</span><span class=o>.</span><span class=n>WRITE_TRUNCATE</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>create_disposition</span><span class=o>=</span><span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>BigQueryDisposition</span><span class=o>.</span><span class=n>CREATE_IF_NEEDED</span><span class=p>)</span></span></span></code></pre></div></div></div><p class=language-java>The <code>write</code> transform writes a <code>PCollection</code> of custom typed objects to a BigQuery
table. Use <code>.withFormatFunction(SerializableFunction)</code> to provide a formatting
function that converts each input element in the <code>PCollection</code> into a
<code>TableRow</code>. This example uses <code>write</code> to write a <code>PCollection&lt;String></code>. The
write operation creates a table if needed. If the table already exists, it is
replaced.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>quotes</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>BigQueryIO</span><span class=o>.&lt;</span><span class=n>Quote</span><span class=o>&gt;</span><span class=n>write</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>to</span><span class=o>(</span><span class=n>tableSpec</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withSchema</span><span class=o>(</span><span class=n>tableSchema</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withFormatFunction</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=o>(</span><span class=n>Quote</span> <span class=n>elem</span><span class=o>)</span> <span class=o>-&gt;</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableRow</span><span class=o>().</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;source&#34;</span><span class=o>,</span> <span class=n>elem</span><span class=o>.</span><span class=na>source</span><span class=o>).</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;quote&#34;</span><span class=o>,</span> <span class=n>elem</span><span class=o>.</span><span class=na>quote</span><span class=o>))</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withCreateDisposition</span><span class=o>(</span><span class=n>CreateDisposition</span><span class=o>.</span><span class=na>CREATE_IF_NEEDED</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withWriteDisposition</span><span class=o>(</span><span class=n>WriteDisposition</span><span class=o>.</span><span class=na>WRITE_TRUNCATE</span><span class=o>));</span></span></span></code></pre></div></div></div><p class=language-java>When you use streaming inserts, you can decide what to do with failed records.
You can either keep retrying, or return the failed records in a separate
<code>PCollection</code> using the <code>WriteResult.getFailedInserts()</code> method.</p><h3 id=storage-write-api>Using the Storage Write API</h3><p>Starting with version 2.36.0 of the Beam SDK for Java, you can use the
<a href=https://cloud.google.com/bigquery/docs/write-api>BigQuery Storage Write API</a>
from the BigQueryIO connector.</p><p>Also after version 2.47.0 of Beam SDK for Python, SDK supports BigQuery Storage Write API.</p><p class=language-py>BigQuery Storage Write API for Python SDK currently has some limitations on supported data types. As this method makes use of cross-language transforms, we are limited to the types supported at the cross-language boundary. For example, <code>apache_beam.utils.timestamp.Timestamp</code> is needed to write a <code>TIMESTAMP</code> BigQuery type. Also, some types (e.g. <code>DATETIME</code>) are not supported yet. For more details, please refer to the <a href=https://github.com/apache/beam/blob/0b430748cdd2e25edc553747ce018195e9cce888/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L112-L123>full type mapping</a>.</p><p class=language-py><p><strong>Note:</strong> If you want to run WriteToBigQuery with Storage Write API from the source code, you need to run <code>./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build</code> to build the expansion-service jar. If you are running from a released Beam SDK, the jar is already included.</p><p><strong>Note:</strong> Auto sharding is not currently supported for Python&rsquo;s Storage Write API exactly-once mode on DataflowRunner.</p></p><h4 id=exactly-once-semantics>Exactly-once semantics</h4><p>To write to BigQuery using the Storage Write API, set <code>withMethod</code> to
<a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.Method.html#STORAGE_WRITE_API><code>Method.STORAGE_WRITE_API</code></a>.
Here’s an example transform that writes to BigQuery using the Storage Write API and exactly-once semantics:</p><p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>WriteResult</span> <span class=n>writeResult</span> <span class=o>=</span> <span class=n>rows</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;Save Rows to BigQuery&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl><span class=n>BigQueryIO</span><span class=o>.</span><span class=na>writeTableRows</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>to</span><span class=o>(</span><span class=n>options</span><span class=o>.</span><span class=na>getFullyQualifiedTableName</span><span class=o>())</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withWriteDisposition</span><span class=o>(</span><span class=n>WriteDisposition</span><span class=o>.</span><span class=na>WRITE_APPEND</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withCreateDisposition</span><span class=o>(</span><span class=n>CreateDisposition</span><span class=o>.</span><span class=na>CREATE_NEVER</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withMethod</span><span class=o>(</span><span class=n>Method</span><span class=o>.</span><span class=na>STORAGE_WRITE_API</span><span class=o>)</span>
</span></span><span class=line><span class=cl><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>quotes</span> <span class=o>|</span> <span class=s2>&#34;WriteTableWithStorageAPI&#34;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>WriteToBigQuery</span><span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>table_spec</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>schema</span><span class=o>=</span><span class=n>table_schema</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>method</span><span class=o>=</span><span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>WriteToBigQuery</span><span class=o>.</span><span class=n>Method</span><span class=o>.</span><span class=n>STORAGE_WRITE_API</span><span class=p>)</span></span></span></code></pre></div></div></div></p><p>If you want to change the behavior of BigQueryIO so that all the BigQuery sinks
for your pipeline use the Storage Write API by default, set the
<a href=https://github.com/apache/beam/blob/2c18ce0ccd7705473aa9ecc443dcdbe223dd9449/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java#L82-L86><code>UseStorageWriteApi</code> option</a>.</p><p>If your pipeline needs to create the table (in case it doesn’t exist and you
specified the create disposition as <code>CREATE_IF_NEEDED</code>), you must provide a
table schema. The API uses the schema to validate data and convert it to a
binary protocol.</p><p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>TableSchema</span> <span class=n>schema</span> <span class=o>=</span> <span class=k>new</span> <span class=n>TableSchema</span><span class=o>().</span><span class=na>setFields</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>List</span><span class=o>.</span><span class=na>of</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;request_ts&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;TIMESTAMP&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setMode</span><span class=o>(</span><span class=s>&#34;REQUIRED&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;user_name&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;STRING&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setMode</span><span class=o>(</span><span class=s>&#34;REQUIRED&#34;</span><span class=o>)));</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>table_schema</span> <span class=o>=</span> <span class=p>{</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;fields&#39;</span><span class=p>:</span> <span class=p>[{</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;name&#39;</span><span class=p>:</span> <span class=s1>&#39;source&#39;</span><span class=p>,</span> <span class=s1>&#39;type&#39;</span><span class=p>:</span> <span class=s1>&#39;STRING&#39;</span><span class=p>,</span> <span class=s1>&#39;mode&#39;</span><span class=p>:</span> <span class=s1>&#39;NULLABLE&#39;</span>
</span></span><span class=line><span class=cl> <span class=p>},</span> <span class=p>{</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;name&#39;</span><span class=p>:</span> <span class=s1>&#39;quote&#39;</span><span class=p>,</span> <span class=s1>&#39;type&#39;</span><span class=p>:</span> <span class=s1>&#39;STRING&#39;</span><span class=p>,</span> <span class=s1>&#39;mode&#39;</span><span class=p>:</span> <span class=s1>&#39;REQUIRED&#39;</span>
</span></span><span class=line><span class=cl> <span class=p>}]</span>
</span></span><span class=line><span class=cl><span class=p>}</span></span></span></code></pre></div></div></div></p><p>For streaming pipelines, you need to set two additional parameters: the number
of streams and the triggering frequency.</p><p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>BigQueryIO</span><span class=o>.</span><span class=na>writeTableRows</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=c1>// ...
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>.</span><span class=na>withTriggeringFrequency</span><span class=o>(</span><span class=n>Duration</span><span class=o>.</span><span class=na>standardSeconds</span><span class=o>(</span><span class=n>5</span><span class=o>))</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withNumStorageWriteApiStreams</span><span class=o>(</span><span class=n>3</span><span class=o>)</span>
</span></span><span class=line><span class=cl><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=c1># The Python SDK doesn&#39;t currently support setting the number of write streams</span>
</span></span><span class=line><span class=cl><span class=n>quotes</span> <span class=o>|</span> <span class=s2>&#34;StorageWriteAPIWithFrequency&#34;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>WriteToBigQuery</span><span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>table_spec</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>schema</span><span class=o>=</span><span class=n>table_schema</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>method</span><span class=o>=</span><span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>WriteToBigQuery</span><span class=o>.</span><span class=n>Method</span><span class=o>.</span><span class=n>STORAGE_WRITE_API</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>triggering_frequency</span><span class=o>=</span><span class=mi>5</span><span class=p>)</span></span></span></code></pre></div></div></div></p><p>The number of streams defines the parallelism of the BigQueryIO Write transform
and roughly corresponds to the number of Storage Write API streams that the
pipeline uses. You can set it explicitly on the transform via
<a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withNumStorageWriteApiStreams-int-><code>withNumStorageWriteApiStreams</code></a>
or provide the <code>numStorageWriteApiStreams</code> option to the pipeline as defined in
<a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.html><code>BigQueryOptions</code></a>.
Please note this is only supported for streaming pipelines.</p><p>Triggering frequency determines how soon the data is visible for querying in
BigQuery. You can explicitly set it via
<a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda.time.Duration-><code>withTriggeringFrequency</code></a>
or specify the number of seconds by setting the
<code>storageWriteApiTriggeringFrequencySec</code> option.</p><p>The combination of these two parameters affects the size of the batches of rows
that BigQueryIO creates before calling the Storage Write API. Setting the
frequency too high can result in smaller batches, which can affect performance.
As a general rule, a single stream should be able to handle throughput of at
least 1Mb per second. Creating exclusive streams is an expensive operation for
the BigQuery service, so you should use only as many streams as needed for your
use case. Triggering frequency in single-digit seconds is a good choice for most
pipelines.</p><span class=language-java><p>Similar to streaming inserts, <code>STORAGE_WRITE_API</code> supports dynamically determining
the number of parallel streams to write to BigQuery (starting 2.42.0). You can
explicitly enable this using <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withAutoSharding--><code>withAutoSharding</code></a>.</p><p><code>STORAGE_WRITE_API</code> defaults to dynamic sharding when
<code>numStorageWriteApiStreams</code> is set to 0 or is unspecified.</p><p><em><strong>Note:</strong></em> Auto sharding with <code>STORAGE_WRITE_API</code> is supported by Dataflow, but <strong>not</strong> on Runner v2.</p></span><p>When using <code>STORAGE_WRITE_API</code>, the <code>PCollection</code> returned by
<a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.html#getFailedStorageApiInserts--><code>WriteResult.getFailedStorageApiInserts</code></a>
contains the rows that failed to be written to the Storage Write API sink.</p><h4 id=at-least-once-semantics>At-least-once semantics</h4><p>If your use case allows for potential duplicate records in the target table, you
can use the
<a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.Method.html#STORAGE_API_AT_LEAST_ONCE><code>STORAGE_API_AT_LEAST_ONCE</code></a>
method. This method doesn’t persist the records to be written to
BigQuery into its shuffle storage, which is needed to provide the exactly-once semantics
of the <code>STORAGE_WRITE_API</code> method. Therefore, for most pipelines, using this method is often
less expensive and results in lower latency.
If you use <code>STORAGE_API_AT_LEAST_ONCE</code>, you don’t need to
specify the number of streams, and you can’t specify the triggering frequency.</p><p>Auto sharding is not applicable for <code>STORAGE_API_AT_LEAST_ONCE</code>.</p><p>When using <code>STORAGE_API_AT_LEAST_ONCE</code>, the <code>PCollection</code> returned by
<a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.html#getFailedStorageApiInserts--><code>WriteResult.getFailedStorageApiInserts</code></a>
contains the rows that failed to be written to the Storage Write API sink.</p><h4 id=quotas>Quotas</h4><p>Before using the Storage Write API, be aware of the
<a href=https://cloud.google.com/bigquery/quotas#write-api-limits>BigQuery Storage Write API quotas</a>.</p><h3 id=using-dynamic-destinations>Using dynamic destinations</h3><p>You can use the dynamic destinations feature to write elements in a
<code>PCollection</code> to different BigQuery tables, possibly with different schemas.</p><p>The dynamic destinations feature groups your user type by a user-defined
destination key, uses the key to compute a destination table and/or schema, and
writes each group&rsquo;s elements to the computed destination.</p><p>In addition, you can also write your own types that have a mapping function to
<code>TableRow</code>, and you can use side inputs in all <code>DynamicDestinations</code> methods.</p><p class=language-java>To use dynamic destinations, you must create a <code>DynamicDestinations</code> object and
implement the following methods:</p><span class=language-java><ul><li><p><code>getDestination</code>: Returns an object that <code>getTable</code> and <code>getSchema</code> can use as
the destination key to compute the destination table and/or schema.</p></li><li><p><code>getTable</code>: Returns the table (as a <code>TableDestination</code> object) for the
destination key. This method must return a unique table for each unique
destination.</p></li><li><p><code>getSchema</code>: Returns the table schema (as a <code>TableSchema</code> object) for the
destination key.</p></li></ul></span><p class=language-java>Then, use <code>write().to</code> with your <code>DynamicDestinations</code> object. This example
uses a <code>PCollection</code> that contains weather data and writes the data into a
different table for each year.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=cm>/*
</span></span></span><span class=line><span class=cl><span class=cm>@DefaultCoder(AvroCoder.class)
</span></span></span><span class=line><span class=cl><span class=cm>static class WeatherData {
</span></span></span><span class=line><span class=cl><span class=cm> final long year;
</span></span></span><span class=line><span class=cl><span class=cm> final long month;
</span></span></span><span class=line><span class=cl><span class=cm> final long day;
</span></span></span><span class=line><span class=cl><span class=cm> final double maxTemp;
</span></span></span><span class=line><span class=cl><span class=cm>
</span></span></span><span class=line><span class=cl><span class=cm> public WeatherData() {
</span></span></span><span class=line><span class=cl><span class=cm> this.year = 0;
</span></span></span><span class=line><span class=cl><span class=cm> this.month = 0;
</span></span></span><span class=line><span class=cl><span class=cm> this.day = 0;
</span></span></span><span class=line><span class=cl><span class=cm> this.maxTemp = 0.0f;
</span></span></span><span class=line><span class=cl><span class=cm> }
</span></span></span><span class=line><span class=cl><span class=cm> public WeatherData(long year, long month, long day, double maxTemp) {
</span></span></span><span class=line><span class=cl><span class=cm> this.year = year;
</span></span></span><span class=line><span class=cl><span class=cm> this.month = month;
</span></span></span><span class=line><span class=cl><span class=cm> this.day = day;
</span></span></span><span class=line><span class=cl><span class=cm> this.maxTemp = maxTemp;
</span></span></span><span class=line><span class=cl><span class=cm> }
</span></span></span><span class=line><span class=cl><span class=cm>}
</span></span></span><span class=line><span class=cl><span class=cm>*/</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>WeatherData</span><span class=o>&gt;</span> <span class=n>weatherData</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=n>p</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>BigQueryIO</span><span class=o>.</span><span class=na>read</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=o>(</span><span class=n>SchemaAndRecord</span> <span class=n>elem</span><span class=o>)</span> <span class=o>-&gt;</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=n>GenericRecord</span> <span class=n>record</span> <span class=o>=</span> <span class=n>elem</span><span class=o>.</span><span class=na>getRecord</span><span class=o>();</span>
</span></span><span class=line><span class=cl> <span class=k>return</span> <span class=k>new</span> <span class=n>WeatherData</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=o>(</span><span class=n>Long</span><span class=o>)</span> <span class=n>record</span><span class=o>.</span><span class=na>get</span><span class=o>(</span><span class=s>&#34;year&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=o>(</span><span class=n>Long</span><span class=o>)</span> <span class=n>record</span><span class=o>.</span><span class=na>get</span><span class=o>(</span><span class=s>&#34;month&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=o>(</span><span class=n>Long</span><span class=o>)</span> <span class=n>record</span><span class=o>.</span><span class=na>get</span><span class=o>(</span><span class=s>&#34;day&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=o>(</span><span class=n>Double</span><span class=o>)</span> <span class=n>record</span><span class=o>.</span><span class=na>get</span><span class=o>(</span><span class=s>&#34;max_temperature&#34;</span><span class=o>));</span>
</span></span><span class=line><span class=cl> <span class=o>})</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>fromQuery</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;SELECT year, month, day, max_temperature &#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34;FROM [apache-beam-testing.samples.weather_stations] &#34;</span>
</span></span><span class=line><span class=cl> <span class=o>+</span> <span class=s>&#34;WHERE year BETWEEN 2007 AND 2009&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withCoder</span><span class=o>(</span><span class=n>AvroCoder</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>WeatherData</span><span class=o>.</span><span class=na>class</span><span class=o>)));</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=c1>// We will send the weather data into different tables for every year.
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>weatherData</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>BigQueryIO</span><span class=o>.&lt;</span><span class=n>WeatherData</span><span class=o>&gt;</span><span class=n>write</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>to</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>DynamicDestinations</span><span class=o>&lt;</span><span class=n>WeatherData</span><span class=o>,</span> <span class=n>Long</span><span class=o>&gt;()</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=nd>@Override</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=n>Long</span> <span class=nf>getDestination</span><span class=o>(</span><span class=n>ValueInSingleWindow</span><span class=o>&lt;</span><span class=n>WeatherData</span><span class=o>&gt;</span> <span class=n>elem</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=k>return</span> <span class=n>elem</span><span class=o>.</span><span class=na>getValue</span><span class=o>().</span><span class=na>year</span><span class=o>;</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=nd>@Override</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=n>TableDestination</span> <span class=nf>getTable</span><span class=o>(</span><span class=n>Long</span> <span class=n>destination</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=k>return</span> <span class=k>new</span> <span class=n>TableDestination</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableReference</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setProjectId</span><span class=o>(</span><span class=n>writeProject</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setDatasetId</span><span class=o>(</span><span class=n>writeDataset</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setTableId</span><span class=o>(</span><span class=n>writeTable</span> <span class=o>+</span> <span class=s>&#34;_&#34;</span> <span class=o>+</span> <span class=n>destination</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=s>&#34;Table for year &#34;</span> <span class=o>+</span> <span class=n>destination</span><span class=o>);</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl> <span class=nd>@Override</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=n>TableSchema</span> <span class=nf>getSchema</span><span class=o>(</span><span class=n>Long</span> <span class=n>destination</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=k>return</span> <span class=k>new</span> <span class=n>TableSchema</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setFields</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>ImmutableList</span><span class=o>.</span><span class=na>of</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;year&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;INTEGER&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setMode</span><span class=o>(</span><span class=s>&#34;REQUIRED&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;month&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;INTEGER&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setMode</span><span class=o>(</span><span class=s>&#34;REQUIRED&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;day&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;INTEGER&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setMode</span><span class=o>(</span><span class=s>&#34;REQUIRED&#34;</span><span class=o>),</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableFieldSchema</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setName</span><span class=o>(</span><span class=s>&#34;maxTemp&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;FLOAT&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>setMode</span><span class=o>(</span><span class=s>&#34;NULLABLE&#34;</span><span class=o>)));</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl> <span class=o>})</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withFormatFunction</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=o>(</span><span class=n>WeatherData</span> <span class=n>elem</span><span class=o>)</span> <span class=o>-&gt;</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableRow</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;year&#34;</span><span class=o>,</span> <span class=n>elem</span><span class=o>.</span><span class=na>year</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;month&#34;</span><span class=o>,</span> <span class=n>elem</span><span class=o>.</span><span class=na>month</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;day&#34;</span><span class=o>,</span> <span class=n>elem</span><span class=o>.</span><span class=na>day</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;maxTemp&#34;</span><span class=o>,</span> <span class=n>elem</span><span class=o>.</span><span class=na>maxTemp</span><span class=o>))</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withCreateDisposition</span><span class=o>(</span><span class=n>CreateDisposition</span><span class=o>.</span><span class=na>CREATE_IF_NEEDED</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withWriteDisposition</span><span class=o>(</span><span class=n>WriteDisposition</span><span class=o>.</span><span class=na>WRITE_TRUNCATE</span><span class=o>));</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>fictional_characters_view</span> <span class=o>=</span> <span class=n>beam</span><span class=o>.</span><span class=n>pvalue</span><span class=o>.</span><span class=n>AsDict</span><span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>pipeline</span> <span class=o>|</span> <span class=s1>&#39;CreateCharacters&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>Create</span><span class=p>([(</span><span class=s1>&#39;Yoda&#39;</span><span class=p>,</span> <span class=kc>True</span><span class=p>),</span>
</span></span><span class=line><span class=cl> <span class=p>(</span><span class=s1>&#39;Obi Wan Kenobi&#39;</span><span class=p>,</span> <span class=kc>True</span><span class=p>)]))</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=k>def</span> <span class=nf>table_fn</span><span class=p>(</span><span class=n>element</span><span class=p>,</span> <span class=n>fictional_characters</span><span class=p>):</span>
</span></span><span class=line><span class=cl> <span class=k>if</span> <span class=n>element</span> <span class=ow>in</span> <span class=n>fictional_characters</span><span class=p>:</span>
</span></span><span class=line><span class=cl> <span class=k>return</span> <span class=s1>&#39;my_dataset.fictional_quotes&#39;</span>
</span></span><span class=line><span class=cl> <span class=k>else</span><span class=p>:</span>
</span></span><span class=line><span class=cl> <span class=k>return</span> <span class=s1>&#39;my_dataset.real_quotes&#39;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=n>quotes</span> <span class=o>|</span> <span class=s1>&#39;WriteWithDynamicDestination&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>WriteToBigQuery</span><span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>table_fn</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>schema</span><span class=o>=</span><span class=n>table_schema</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>table_side_inputs</span><span class=o>=</span><span class=p>(</span><span class=n>fictional_characters_view</span><span class=p>,</span> <span class=p>),</span>
</span></span><span class=line><span class=cl> <span class=n>write_disposition</span><span class=o>=</span><span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>BigQueryDisposition</span><span class=o>.</span><span class=n>WRITE_TRUNCATE</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>create_disposition</span><span class=o>=</span><span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>BigQueryDisposition</span><span class=o>.</span><span class=n>CREATE_IF_NEEDED</span><span class=p>)</span></span></span></code></pre></div></div></div><h3 id=using-time-partitioning>Using time partitioning</h3><p>BigQuery time partitioning divides your table into smaller partitions, which is
called a <a href=https://cloud.google.com/bigquery/docs/partitioned-tables>partitioned table</a>.
Partitioned tables make it easier for you to manage and query your data.</p><p class=language-java>To use BigQuery time partitioning, use one of these two methods:</p><span class=language-java><ul><li><p><code>withTimePartitioning</code>: This method takes a <code>TimePartitioning</code> class, and is
only usable if you are writing to a single table.</p></li><li><p><code>withJsonTimePartitioning</code>: This method is the same as
<code>withTimePartitioning</code>, but takes a JSON-serialized String object.</p></li></ul></span><p class=language-java>This example generates one partition per day.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>weatherData</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=n>BigQueryIO</span><span class=o>.&lt;</span><span class=n>WeatherData</span><span class=o>&gt;</span><span class=n>write</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>to</span><span class=o>(</span><span class=n>tableSpec</span> <span class=o>+</span> <span class=s>&#34;_partitioning&#34;</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withSchema</span><span class=o>(</span><span class=n>tableSchema</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withFormatFunction</span><span class=o>(</span>
</span></span><span class=line><span class=cl> <span class=o>(</span><span class=n>WeatherData</span> <span class=n>elem</span><span class=o>)</span> <span class=o>-&gt;</span>
</span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>TableRow</span><span class=o>()</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;year&#34;</span><span class=o>,</span> <span class=n>elem</span><span class=o>.</span><span class=na>year</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;month&#34;</span><span class=o>,</span> <span class=n>elem</span><span class=o>.</span><span class=na>month</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;day&#34;</span><span class=o>,</span> <span class=n>elem</span><span class=o>.</span><span class=na>day</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>set</span><span class=o>(</span><span class=s>&#34;maxTemp&#34;</span><span class=o>,</span> <span class=n>elem</span><span class=o>.</span><span class=na>maxTemp</span><span class=o>))</span>
</span></span><span class=line><span class=cl> <span class=c1>// NOTE: an existing table without time partitioning set up will not work
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>.</span><span class=na>withTimePartitioning</span><span class=o>(</span><span class=k>new</span> <span class=n>TimePartitioning</span><span class=o>().</span><span class=na>setType</span><span class=o>(</span><span class=s>&#34;DAY&#34;</span><span class=o>))</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withCreateDisposition</span><span class=o>(</span><span class=n>CreateDisposition</span><span class=o>.</span><span class=na>CREATE_IF_NEEDED</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>withWriteDisposition</span><span class=o>(</span><span class=n>WriteDisposition</span><span class=o>.</span><span class=na>WRITE_TRUNCATE</span><span class=o>));</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>quotes</span> <span class=o>|</span> <span class=s1>&#39;WriteWithTimePartitioning&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>WriteToBigQuery</span><span class=p>(</span>
</span></span><span class=line><span class=cl> <span class=n>table_spec</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>schema</span><span class=o>=</span><span class=n>table_schema</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>write_disposition</span><span class=o>=</span><span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>BigQueryDisposition</span><span class=o>.</span><span class=n>WRITE_TRUNCATE</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>create_disposition</span><span class=o>=</span><span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>BigQueryDisposition</span><span class=o>.</span><span class=n>CREATE_IF_NEEDED</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=n>additional_bq_parameters</span><span class=o>=</span><span class=p>{</span><span class=s1>&#39;timePartitioning&#39;</span><span class=p>:</span> <span class=p>{</span>
</span></span><span class=line><span class=cl> <span class=s1>&#39;type&#39;</span><span class=p>:</span> <span class=s1>&#39;HOUR&#39;</span>
</span></span><span class=line><span class=cl> <span class=p>}})</span></span></span></code></pre></div></div></div><h2 id=limitations>Limitations</h2><p>BigQueryIO currently has the following limitations.</p><ol><li><p>You can’t sequence the completion of a BigQuery write with other steps of
your pipeline.</p></li><li><p>If you are using the Beam SDK for Python, you might have import size quota
issues if you write a very large dataset. As a workaround, you can partition
the dataset (for example, using Beam&rsquo;s <code>Partition</code> transform) and write to
multiple BigQuery tables. The Beam SDK for Java does not have this limitation
as it partitions your dataset for you.</p></li><li><p>When you <a href=https://cloud.google.com/bigquery/docs/loading-data>load data</a> into BigQuery, <a href=https://cloud.google.com/bigquery/quotas#load_jobs>these limits</a> are applied.
By default, BigQuery uses a shared pool of slots to load data.
This means that the available capacity is not guaranteed, and your load may be queued until
a slot becomes available. If a slot does not become available within 6 hours,
the load will fail due to the limits set by BigQuery. To avoid this situation,
it is highly recommended that you use <a href=https://cloud.google.com/bigquery/docs/reservations-intro#benefits_of_reservations>BigQuery reservations</a>,
which ensure that your load does not get queued and fail due to capacity issues.</p></li></ol><h2 id=additional-examples>Additional examples</h2><p>You can find additional examples that use BigQuery in Beam&rsquo;s examples
directories.</p><h3 id=java-cookbook-examples>Java cookbook examples</h3><p>These examples are from the Java <a href=https://github.com/apache/beam/tree/master/examples/java/src/main/java/org/apache/beam/examples/cookbook>cookbook examples</a>
directory.</p><ul><li><p><a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java>BigQueryTornadoes</a>
reads the public samples of weather data from BigQuery, counts the number of
tornadoes that occur in each month, and writes the results to a BigQuery
table.</p></li><li><p><a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java>CombinePerKeyExamples</a>
reads the public Shakespeare data from BigQuery, and for each word in the
dataset that exceeds a given length, generates a string containing the list of
play names in which that word appears. The pipeline then writes the results to
a BigQuery table.</p></li><li><p><a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java>FilterExamples</a>
reads public samples of weather data from BigQuery, performs a projection
on the data, finds the global mean of the temperature readings, filters on
readings for a single given month, and outputs only data (for that month)
that has a mean temp smaller than the derived global mean.</p></li><li><p><a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java>JoinExamples</a>
reads a sample of the <a href=https://goo.gl/OB6oin>GDELT &ldquo;world event&rdquo;</a> from
BigQuery and joins the event <code>action</code> country code against a table that maps
country codes to country names.</p></li><li><p><a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java>MaxPerKeyExamples</a>
reads the public samples of weather data from BigQuery, finds the maximum
temperature for each month, and writes the results to a BigQuery table.</p></li><li><p><a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java>TriggerExample</a>
performs a streaming analysis of traffic data from San Diego freeways. The
pipeline looks at the data coming in from a text file and writes the results
to a BigQuery table.</p></li></ul><h3 id=java-complete-examples>Java complete examples</h3><p>These examples are from the Java <a href=https://github.com/apache/beam/tree/master/examples/java/src/main/java/org/apache/beam/examples/complete>complete examples</a>
directory.</p><ul><li><p><a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java>AutoComplete</a>
computes the most popular hash tags for every prefix, which can be used for
auto-completion. The pipeline can optionally write the results to a BigQuery
table.</p></li><li><p><a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java>StreamingWordExtract</a>
reads lines of text, splits each line into individual words, capitalizes those
words, and writes the output to a BigQuery table.</p></li><li><p><a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java>TrafficMaxLaneFlow</a>
reads traffic sensor data, finds the lane that had the highest recorded flow,
and writes the results to a BigQuery table.</p></li><li><p><a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java>TrafficRoutes</a>
reads traffic sensor data, calculates the average speed for each window and
looks for slowdowns in routes, and writes the results to a BigQuery table.</p></li></ul><h3 id=python-cookbook-examples>Python cookbook examples</h3><p>These examples are from the Python <a href=https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/cookbook>cookbook examples</a>
directory.</p><ul><li><p><a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py>BigQuery schema</a>
creates a <code>TableSchema</code> with nested and repeated fields, generates data with
nested and repeated fields, and writes the data to a BigQuery table.</p></li><li><p><a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py>BigQuery side inputs</a>
uses BigQuery sources as side inputs. It illustrates how to insert
side-inputs into transforms in three different forms: as a singleton, as a
iterator, and as a list.</p></li><li><p><a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py>BigQuery tornadoes</a>
reads from a BigQuery table that has the &lsquo;month&rsquo; and &rsquo;tornado&rsquo; fields as part
of the table schema, computes the number of tornadoes in each month, and
outputs the results to a BigQuery table.</p></li><li><p><a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/filters.py>BigQuery filters</a>
reads weather station data from a BigQuery table, manipulates BigQuery rows in
memory, and writes the results to a BigQuery table.</p></li></ul><div class=feedback><p class=update>Last updated on 2024/03/28</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>