blob: d39478cf065310aeddd637a1ac5182e8c7afeb62 [file] [log] [blame]
<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>Apache Snowflake I/O connector</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/documentation/io/built-in/snowflake/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/io/built-in/snowflake.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/io/built-in/snowflake.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Documentation</span></li><li><a href=/documentation>Using the Documentation</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Concepts</span><ul class=section-nav-list><li><a href=/documentation/basics/>Basics of the Beam model</a></li><li><a href=/documentation/runtime/model/>How Beam executes a pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Beam programming guide</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/>Overview</a></li><li><a href=/documentation/programming-guide/#creating-a-pipeline>Pipelines</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>PCollections</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pcollections>Creating a PCollection</a></li><li><a href=/documentation/programming-guide/#pcollection-characteristics>PCollection characteristics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#applying-transforms>Applying transforms</a></li><li><span class=section-nav-list-title>Core Beam transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pardo>ParDo</a></li><li><a href=/documentation/programming-guide/#groupbykey>GroupByKey</a></li><li><a href=/documentation/programming-guide/#cogroupbykey>CoGroupByKey</a></li><li><a href=/documentation/programming-guide/#combine>Combine</a></li><li><a href=/documentation/programming-guide/#flatten>Flatten</a></li><li><a href=/documentation/programming-guide/#partition>Partition</a></li></ul></li><li><a href=/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms>Requirements for user code</a></li><li><a href=/documentation/programming-guide/#side-inputs>Side inputs</a></li><li><a href=/documentation/programming-guide/#additional-outputs>Additional outputs</a></li><li><a href=/documentation/programming-guide/#composite-transforms>Composite transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline I/O</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pipeline-io>Using I/O transforms</a></li><li><a href=/documentation/io/connectors/>I/O connectors</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>I/O connector guides</span><ul class=section-nav-list><li><a href=/documentation/io/built-in/parquet/>Apache Parquet I/O connector</a></li><li><a href=/documentation/io/built-in/hadoop/>Hadoop Input/Output Format IO</a></li><li><a href=/documentation/io/built-in/hcatalog/>HCatalog IO</a></li><li><a href=/documentation/io/built-in/google-bigquery/>Google BigQuery I/O connector</a></li><li><a href=/documentation/io/built-in/snowflake/>Snowflake I/O connector</a></li><li><a href=/documentation/io/built-in/cdap/>CDAP I/O connector</a></li><li><a href=/documentation/io/built-in/sparkreceiver/>Spark Receiver I/O connector</a></li><li><a href=/documentation/io/built-in/singlestore/>SingleStoreDB I/O connector</a></li><li><a href=/documentation/io/built-in/webapis/>Web APIs I/O connector</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Developing new I/O connectors</span><ul class=section-nav-list><li><a href=/documentation/io/developing-io-overview/>Overview: Developing connectors</a></li><li><a href=/documentation/io/developing-io-java/>Developing connectors (Java)</a></li><li><a href=/documentation/io/developing-io-python/>Developing connectors (Python)</a></li><li><a href=/documentation/io/io-standards/>I/O Standards</a></li></ul></li><li><a href=/documentation/io/testing/>Testing I/O transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Schemas</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#what-is-a-schema>What is a schema</a></li><li><a href=/documentation/programming-guide/#schemas-for-pl-types>Schemas for programming language types</a></li><li><a href=/documentation/programming-guide/#schema-definition>Schema definition</a></li><li><a href=/documentation/programming-guide/#logical-types>Logical types</a></li><li><a href=/documentation/programming-guide/#creating-schemas>Creating schemas</a></li><li><a href=/documentation/programming-guide/#using-schemas>Using schemas</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data encoding and type safety</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#data-encoding-and-type-safety>Data encoding basics</a></li><li><a href=/documentation/programming-guide/#specifying-coders>Specifying coders</a></li><li><a href=/documentation/programming-guide/#default-coders-and-the-coderregistry>Default coders and the CoderRegistry</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Windowing</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#windowing>Windowing basics</a></li><li><a href=/documentation/programming-guide/#provided-windowing-functions>Provided windowing functions</a></li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function>Setting your PCollection’s windowing function</a></li><li><a href=/documentation/programming-guide/#watermarks-and-late-data>Watermarks and late data</a></li><li><a href=/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements>Adding timestamps to a PCollection’s elements</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Triggers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#triggers>Trigger basics</a></li><li><a href=/documentation/programming-guide/#event-time-triggers>Event time triggers and the default trigger</a></li><li><a href=/documentation/programming-guide/#processing-time-triggers>Processing time triggers</a></li><li><a href=/documentation/programming-guide/#data-driven-triggers>Data-driven triggers</a></li><li><a href=/documentation/programming-guide/#setting-a-trigger>Setting a trigger</a></li><li><a href=/documentation/programming-guide/#composite-triggers>Composite triggers</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Metrics</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#metrics>Metrics basics</a></li><li><a href=/documentation/programming-guide/#types-of-metrics>Types of metrics</a></li><li><a href=/documentation/programming-guide/#querying-metrics>Querying metrics</a></li><li><a href=/documentation/programming-guide/#using-metrics>Using metrics in pipeline</a></li><li><a href=/documentation/programming-guide/#export-metrics>Export metrics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>State and Timers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#types-of-state>Types of state</a></li><li><a href=/documentation/programming-guide/#deferred-state-reads>Deferred state reads</a></li><li><a href=/documentation/programming-guide/#timers>Timers</a></li><li><a href=/documentation/programming-guide/#garbage-collecting-state>Garbage collecting state</a></li><li><a href=/documentation/programming-guide/#state-timers-examples>State and timers examples</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Splittable DoFns</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#sdf-basics>Basics</a></li><li><a href=/documentation/programming-guide/#sizing-and-progress>Sizing and progress</a></li><li><a href=/documentation/programming-guide/#user-initiated-checkpoint>User-initiated checkpoint</a></li><li><a href=/documentation/programming-guide/#runner-initiated-split>Runner initiated split</a></li><li><a href=/documentation/programming-guide/#watermark-estimation>Watermark estimation</a></li><li><a href=/documentation/programming-guide/#truncating-during-drain>Truncating during drain</a></li><li><a href=/documentation/programming-guide/#bundle-finalization>Bundle finalization</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Multi-language Pipelines</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#create-x-lang-transforms>Creating cross-language transforms</a></li><li><a href=/documentation/programming-guide/#use-x-lang-transforms>Using cross-language transforms</a></li><li><a href=/documentation/programming-guide/#x-lang-transform-runner-support>Runner Support</a></li></ul></li><li><a href=/documentation/programming-guide/#batched-dofns>Batched DoFns</a></li><li><a href=/documentation/programming-guide/#transform-service>Transform service</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline development lifecycle</span><ul class=section-nav-list><li><a href=/documentation/pipelines/design-your-pipeline/>Design Your Pipeline</a></li><li><a href=/documentation/pipelines/create-your-pipeline/>Create Your Pipeline</a></li><li><a href=/documentation/pipelines/test-your-pipeline/>Test Your Pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Common pipeline patterns</span><ul class=section-nav-list><li><a href=/documentation/patterns/overview/>Overview</a></li><li><a href=/documentation/patterns/file-processing/>File processing</a></li><li><a href=/documentation/patterns/side-inputs/>Side inputs</a></li><li><a href=/documentation/patterns/pipeline-options/>Pipeline options</a></li><li><a href=/documentation/patterns/custom-io/>Custom I/O</a></li><li><a href=/documentation/patterns/custom-windows/>Custom windows</a></li><li><a href=/documentation/patterns/bigqueryio/>BigQueryIO</a></li><li><a href=/documentation/patterns/ai-platform/>AI Platform</a></li><li><a href=/documentation/patterns/schema/>Schema</a></li><li><a href=/documentation/patterns/bqml/>BigQuery ML</a></li><li><a href=/documentation/patterns/grouping-elements-for-efficient-external-service-calls/>Grouping elements for efficient external service calls</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>AI/ML pipelines</span><ul class=section-nav-list><li><a href=/documentation/ml/overview/>Get started with AI/ML</a></li><li><a href=/documentation/ml/about-ml/>About Beam ML</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Prediction and inference</span><ul class=section-nav-list><li><a href=/documentation/ml/inference-overview/>Overview</a></li><li><a href=/documentation/ml/multi-model-pipelines/>Build a pipeline with multiple models</a></li><li><a href=/documentation/ml/tensorrt-runinference>Build a custom model handler with TensorRT</a></li><li><a href=/documentation/ml/large-language-modeling>Use LLM inference</a></li><li><a href=/documentation/ml/multi-language-inference/>Build a multi-language inference pipeline</a></li><li><a href=/documentation/ml/side-input-updates/>Update your model in production</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data processing</span><ul class=section-nav-list><li><a href=/documentation/ml/preprocess-data/>Preprocess data</a></li><li><a href=/documentation/ml/data-processing/>Explore your data</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Workflow orchestration</span><ul class=section-nav-list><li><a href=/documentation/ml/orchestration/>Use ML-OPS workflow orchestrators</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Model training</span><ul class=section-nav-list><li><a href=/documentation/ml/per-entity-training>Per-entity training</a></li><li><a href=/documentation/ml/online-clustering/>Online clustering</a></li><li><a href=/documentation/ml/model-evaluation/>ML model evaluation</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Use cases</span><ul class=section-nav-list><li><a href=/documentation/ml/anomaly-detection/>Build an anomaly detection pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Reference</span><ul class=section-nav-list><li><a href=/documentation/ml/runinference-metrics/>RunInference metrics</a></li><li><a href=/documentation/ml/model-evaluation/>Model validation</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Runtime systems</span><ul class=section-nav-list><li><a href=/documentation/runtime/environments/>Container environments</a></li><li><a href=/documentation/runtime/resource-hints/>Resource hints</a></li><li><a href=/documentation/runtime/sdk-harness-config/>SDK Harness Configuration</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transform catalog</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Python</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Enrichment</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/enrichment/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-bigtable/>Bigtable example</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-vertexai/>Vertex AI Feature Store examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/python/elementwise/flatmap/>FlatMap</a></li><li><a href=/documentation/transforms/python/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/python/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/python/elementwise/map/>Map</a></li><li><a href=/documentation/transforms/python/elementwise/mltransform/>MLTransform</a></li><li><a href=/documentation/transforms/python/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/python/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/python/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/python/elementwise/reify/>Reify</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>RunInference</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/runinference/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-pytorch/>PyTorch examples</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-sklearn/>Sklearn examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/python/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/python/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/python/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/python/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/combineglobally/>CombineGlobally</a></li><li><a href=/documentation/transforms/python/aggregation/combineperkey/>CombinePerKey</a></li><li><a href=/documentation/transforms/python/aggregation/combinevalues/>CombineValues</a></li><li><a href=/documentation/transforms/python/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/python/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/python/aggregation/groupby/>GroupBy</a></li><li><a href=/documentation/transforms/python/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/python/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/python/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/python/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/python/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/python/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/python/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/python/aggregation/top/>Top</a></li><li><a href=/documentation/transforms/python/aggregation/tolist/>ToList</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/other/create/>Create</a></li><li><a href=/documentation/transforms/python/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/python/other/reshuffle/>Reshuffle</a></li><li><a href=/documentation/transforms/python/other/windowinto/>WindowInto</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Java</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/java/elementwise/flatmapelements/>FlatMapElements</a></li><li><a href=/documentation/transforms/java/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/java/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/java/elementwise/mapelements/>MapElements</a></li><li><a href=/documentation/transforms/java/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/java/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/java/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/java/elementwise/reify/>Reify</a></li><li><a href=/documentation/transforms/java/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/java/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/java/elementwise/withkeys/>WithKeys</a></li><li><a href=/documentation/transforms/java/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/java/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/java/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/combine/>Combine</a></li><li><a href=/documentation/transforms/java/aggregation/combinewithcontext/>CombineWithContext</a></li><li><a href=/documentation/transforms/java/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/java/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/java/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/java/aggregation/hllcount/>HllCount</a></li><li><a href=/documentation/transforms/java/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/java/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/java/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/java/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/java/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/java/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/java/aggregation/top/>Top</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/other/create/>Create</a></li><li><a href=/documentation/transforms/java/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/java/other/passert/>PAssert</a></li><li><a href=/documentation/transforms/java/other/view/>View</a></li><li><a href=/documentation/transforms/java/other/window/>Window</a></li></ul></li></ul></li></ul></li><li><a href=/documentation/glossary/>Glossary</a></li><li><a href=https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam>Beam Wiki <img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#before-you-start>Before you start</a></li><li><a href=#authentication>Authentication</a><ul><li><a href=#username-and-password>Username and password</a></li><li><a href=#key-pair>Key pair</a></li><li><a href=#oauth-token>OAuth token</a></li></ul></li><li><a href=#datasource-configuration>DataSource Configuration</a><ul><li><a href=#general-usage>General usage</a></li></ul></li><li><a href=#pipeline-options>Pipeline options</a><ul><li><a href=#snowflake-pipeline-options>Snowflake Pipeline options</a></li><li><a href=#running-main-command-with-pipeline-options>Running main command with Pipeline options</a></li><li><a href=#running-test-command-with-pipeline-options>Running test command with Pipeline options</a></li></ul></li><li><a href=#running-pipelines-on-dataflow>Running pipelines on Dataflow</a><ul><li><a href=#running-pipeline-templates-on-dataflow>Running pipeline templates on Dataflow</a></li></ul></li><li><a href=#writing-to-snowflake-tables>Writing to Snowflake tables</a><ul><li><a href=#batch-write-from-a-bounded-source>Batch write (from a bounded source)</a></li><li><a href=#streaming-write-from-unbounded-source>Streaming write (from unbounded source)</a><ul><li><a href=#parameters>Parameters</a></li><li><a href=#flush-time-duration--number-of-rows>Flush time: duration & number of rows</a></li></ul></li><li><a href=#userdatamapper-function>UserDataMapper function</a></li><li><a href=#additional-write-options>Additional write options</a><ul><li><a href=#transformation-query>Transformation query</a></li><li><a href=#write-disposition>Write disposition</a></li><li><a href=#create-disposition>Create disposition</a></li><li><a href=#table-schema-disposition>Table schema disposition</a></li></ul></li></ul></li><li><a href=#reading-from-snowflake>Reading from Snowflake</a><ul><li><a href=#general-usage-1>General usage</a></li><li><a href=#csvmapper>CSVMapper</a></li></ul></li><li><a href=#using-snowflakeio-with-aws-s3>Using SnowflakeIO with AWS S3</a></li><li><a href=#using-snowflakeio-in-python-sdk>Using SnowflakeIO in Python SDK</a><ul><li><a href=#intro>Intro</a></li><li><a href=#reading-from-snowflake-1>Reading from Snowflake</a><ul><li><a href=#general-usage-2>General usage</a></li><li><a href=#required-parameters>Required parameters</a></li><li><a href=#authentication-parameters>Authentication parameters</a></li><li><a href=#additional-parameters>Additional parameters</a></li></ul></li><li><a href=#writing-to-snowflake>Writing to Snowflake</a><ul><li><a href=#general-usage-3>General usage</a></li><li><a href=#required-parameters-1>Required parameters</a></li><li><a href=#authentication-parameters-1>Authentication parameters</a></li><li><a href=#additional-parameters-1>Additional parameters</a></li></ul></li></ul></li><li><a href=#limitations>Limitations</a></li></ul></nav></nav><div class="body__contained body__section-nav arrow-list arrow-list--no-mt"><p><a href=/documentation/io/built-in/>Built-in I/O Transforms</a></p><h1 id=snowflake-io>Snowflake I/O</h1><p>Pipeline options and general information about using and running Snowflake IO.</p><h2 id=before-you-start>Before you start</h2><p>To use SnowflakeIO, add the Maven artifact dependency to your <code>pom.xml</code> file.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>&lt;dependency&gt;
&lt;groupId&gt;org.apache.beam&lt;/groupId&gt;
&lt;artifactId&gt;beam-sdks-java-io-snowflake&lt;/artifactId&gt;
&lt;version&gt;2.56.0&lt;/version&gt;
&lt;/dependency&gt;</code></pre></div></div><p>Additional resources:</p><ul><li><a href=https://github.com/apache/beam/tree/master/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake>SnowflakeIO source code</a></li><li><a href=https://beam.apache.org/releases/javadoc/2.56.0/org/apache/beam/sdk/io/snowflake/SnowflakeIO.html>SnowflakeIO Javadoc</a></li><li><a href=https://docs.snowflake.com/en/>Snowflake documentation</a></li></ul><h2 id=authentication>Authentication</h2><p>Reading and batch writing supports the following authentication methods:</p><ul><li>Username and password</li><li>Key pair</li><li>OAuth token</li></ul><p>Streaming writing supports only key pair authentication. For details, see: <a href=https://issues.apache.org/jira/browse/BEAM-3304>BEAM-3304</a>.</p><p>Passing credentials is done via Pipeline options used to instantiate <code>SnowflakeIO.DataSourceConfiguration</code> class. Each authentication method has different ways to configure this class.</p><h3 id=username-and-password>Username and password</h3><p>To use username/password authentication in SnowflakeIO, invoke your pipeline with the following Pipeline options:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>--username=&lt;USERNAME&gt; --password=&lt;PASSWORD&gt;</code></pre></div></div></p><p>Passing credentials is done via Pipeline options used to instantiate <code>SnowflakeIO.DataSourceConfiguration</code> class.<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create()
.withUsernamePasswordAuth(
options.getUsername(),
options.getPassword())
.withServerName(options.getServerName())
.withDatabase(options.getDatabase())
.withRole(options.getRole())
.withWarehouse(options.getWarehouse())
.withSchema(options.getSchema());</code></pre></div></div></p><h3 id=key-pair>Key pair</h3><p>To use this authentication method, you must first generate a key pair and associate the public key with the Snowflake user that will connect using the IO transform. For instructions, see the <a href=https://docs.snowflake.com/en/user-guide/key-pair-auth.html>Key Pair Authentication & Key Pair Rotation</a> in Snowflake documentation.</p><p>To use key pair authentication with SnowflakeIO, invoke your pipeline with one of the following set of Pipeline options:</p><ul><li>with passing the key as a path:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code> --username=&lt;USERNAME&gt; --privateKeyPath=&lt;PATH_TO_P8_FILE&gt; --privateKeyPassphrase=&lt;PASSWORD_FOR_KEY&gt;
</code></pre></div></div>The initialization of an <code>SnowflakeIO.DataSourceConfiguration</code> class may be as follows:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code> SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create()
.withKeyPairPathAuth(
options.getUsername(),
options.getPrivateKeyPath(),
options.getPrivateKeyPassphrase())
.withServerName(options.getServerName())
.withDatabase(options.getDatabase())
.withRole(options.getRole())
.withWarehouse(options.getWarehouse())
.withSchema(options.getSchema());
</code></pre></div></div></li><li>with passing the key as a value:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code> --username=&lt;USERNAME&gt; --rawPrivateKey=&lt;PRIVATE_KEY&gt; --privateKeyPassphrase=&lt;PASSWORD_FOR_KEY&gt;
</code></pre></div></div>The initialization of an <code>SnowflakeIO.DataSourceConfiguration</code> class may be as follows:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code> SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create()
.withKeyPairRawAuth(
options.getUsername(),
options.getRawPrivateKey(),
options.getPrivateKeyPassphrase())
.withServerName(options.getServerName())
.withDatabase(options.getDatabase())
.withRole(options.getRole())
.withWarehouse(options.getWarehouse())
.withSchema(options.getSchema());
</code></pre></div></div></li></ul><h3 id=oauth-token>OAuth token</h3><p>SnowflakeIO also supports OAuth token.</p><p><strong>IMPORTANT</strong>: SnowflakeIO requires a valid OAuth access token. It will neither be able to refresh the token nor obtain it using a web-based flow. For information on configuring an OAuth integration and obtaining the token, see the <a href=https://docs.snowflake.com/en/user-guide/oauth-intro.html>Snowflake documentation</a>.</p><p>Once you have the token, invoke your pipeline with following Pipeline Options:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>--oauthToken=&lt;TOKEN&gt;</code></pre></div></div>The initialization of an <code>SnowflakeIO.DataSourceConfiguration</code> class may be as follows:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code> SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration
.create()
.withUrl(options.getUrl())
.withServerName(options.getServerName())
.withDatabase(options.getDatabase())
.withWarehouse(options.getWarehouse())
.withSchema(options.getSchema());</code></pre></div></div></p><h2 id=datasource-configuration>DataSource Configuration</h2><p>DataSource configuration is required in both read and write object for configuring Snowflake connection properties for IO purposes.</p><h3 id=general-usage>General usage</h3><p>Create the DataSource configuration:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code> SnowflakeIO.DataSourceConfiguration
.create()
.withUrl(options.getUrl())
.withServerName(options.getServerName())
.withDatabase(options.getDatabase())
.withWarehouse(options.getWarehouse())
.withSchema(options.getSchema());</code></pre></div></div>Where parameters can be:</p><ul><li><code>.withUrl(...)</code><ul><li>JDBC-like URL for your Snowflake account, including account name and region, without any parameters.</li><li>Example: <code>.withUrl("jdbc:snowflake://account.snowflakecomputing.com")</code></li></ul></li><li><code>.withServerName(...)</code><ul><li>Server Name - full server name with account, zone and domain.</li><li>Example: <code>.withServerName("account.snowflakecomputing.com")</code></li></ul></li><li><code>.withDatabase(...)</code><ul><li>Name of the Snowflake database to use.</li><li>Example: <code>.withDatabase("MY_DATABASE")</code></li></ul></li><li><code>.withWarehouse(...)</code><ul><li>Name of the Snowflake warehouse to use. This parameter is optional. If no warehouse name is specified, the default warehouse for the user is used.</li><li>Example: <code>.withWarehouse("MY_WAREHOUSE")</code></li></ul></li><li><code>.withSchema(...)</code><ul><li>Name of the schema in the database to use. This parameter is optional.</li><li>Example: <code>.withSchema("PUBLIC")</code></li></ul></li><li><code>.withUsernamePasswordAuth(username, password)</code><ul><li>Sets username/password authentication.</li><li>Example: <code>.withUsernamePasswordAuth("USERNAME", "PASSWORD")</code></li></ul></li><li><code>.withOAuth(token)</code><ul><li>Sets OAuth authentication.</li><li>Example: <code>.withOAuth("TOKEN")</code></li></ul></li><li><code>.withKeyPairAuth(username, privateKey)</code><ul><li>Sets key pair authentication using username and <a href=https://docs.oracle.com/javase/8/docs/api/java/security/PrivateKey.html>PrivateKey</a></li><li>Example: <code>.withKeyPairAuth("USERNAME",</code> <a href=https://docs.oracle.com/javase/8/docs/api/java/security/PrivateKey.html>PrivateKey</a><code>)</code></li></ul></li><li><code>.withKeyPairPathAuth(username, privateKeyPath, privateKeyPassphrase)</code><ul><li>Sets key pair authentication using username, path to private key file and passphrase.</li><li>Example: <code>.withKeyPairPathAuth("USERNAME", "PATH/TO/KEY.P8", "PASSPHRASE")</code></li></ul></li><li><code>.withKeyPairRawAuth(username, rawPrivateKey, privateKeyPassphrase)</code><ul><li>Sets key pair authentication using username, private key and passphrase.</li><li>Example: <code>.withKeyPairRawAuth("USERNAME", "PRIVATE_KEY", "PASSPHRASE")</code></li></ul></li></ul><p><strong>Note</strong> - either <code>.withUrl(...)</code> or <code>.withServerName(...)</code> <strong>is required</strong>.</p><h2 id=pipeline-options>Pipeline options</h2><p>Use Beam’s <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/options/PipelineOptions.html>Pipeline options</a> to set options via the command line.</p><h3 id=snowflake-pipeline-options>Snowflake Pipeline options</h3><p>Snowflake IO library supports following options that can be passed via the <a href=/documentation/io/built-in/snowflake/#running-main-command-with-pipeline-options>command line</a> by default when a Pipeline uses them:</p><p><code>--url</code> Snowflake&rsquo;s JDBC-like url including account name and region without any parameters.</p><p><code>--serverName</code> Full server name with account, zone and domain.</p><p><code>--username</code> Required for username/password and Private Key authentication.</p><p><code>--oauthToken</code> Required for OAuth authentication only.</p><p><code>--password</code> Required for username/password authentication only.</p><p><code>--privateKeyPath</code> Path to Private Key file. Required for Private Key authentication only.</p><p><code>--rawPrivateKey</code> Private Key. Required for Private Key authentication only.</p><p><code>--privateKeyPassphrase</code> Private Key&rsquo;s passphrase. Required for Private Key authentication only.</p><p><code>--stagingBucketName</code> External bucket path ending with <code>/</code>. I.e. <code>{gs,s3}://bucket/</code>. Sub-directories are allowed.</p><p><code>--storageIntegrationName</code> Storage integration name</p><p><code>--warehouse</code> Warehouse to use. Optional.</p><p><code>--database</code> Database name to connect to. Optional.</p><p><code>--schema</code> Schema to use. Optional.</p><p><code>--table</code> Table to use. Optional.</p><p><code>--query</code> Query to use. Optional.</p><p><code>--role</code> Role to use. Optional.</p><p><code>--authenticator</code> Authenticator to use. Optional.</p><p><code>--portNumber</code> Port number. Optional.</p><p><code>--loginTimeout</code> Login timeout. Optional.</p><p><code>--snowPipe</code> SnowPipe name. Optional.</p><h3 id=running-main-command-with-pipeline-options>Running main command with Pipeline options</h3><p>To pass Pipeline options via the command line, use <code>--args</code> in a gradle command as follows:</p><p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>./gradle run
--args=&#34;
--serverName=&lt;SNOWFLAKE SERVER NAME&gt;
Example: --serverName=account.region.gcp.snowflakecomputing.com
--username=&lt;SNOWFLAKE USERNAME&gt;
Example: --username=testuser
--password=&lt;SNOWFLAKE PASSWORD&gt;
Example: --password=mypassword
--database=&lt;SNOWFLAKE DATABASE&gt;
Example: --database=TEST_DATABASE
--schema=&lt;SNOWFLAKE SCHEMA&gt;
Example: --schema=public
--table=&lt;SNOWFLAKE TABLE IN DATABASE&gt;
Example: --table=TEST_TABLE
--query=&lt;IF NOT TABLE THEN QUERY&gt;
Example: --query=‘SELECT column FROM TABLE’
--storageIntegrationName=&lt;SNOWFLAKE STORAGE INTEGRATION NAME&gt;
Example: --storageIntegrationName=my_integration
--stagingBucketName=&lt;GCS OR S3 BUCKET&gt;
Example: --stagingBucketName={gs,s3}://bucket/
--runner=&lt;DirectRunner/DataflowRunner&gt;
Example: --runner=DataflowRunner
--project=&lt;FOR DATAFLOW RUNNER: GCP PROJECT NAME&gt;
Example: --project=my_project
--tempLocation=&lt;FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING
WITH gs://…&gt;
Example: --tempLocation=gs://bucket/temp/
--region=&lt;FOR DATAFLOW RUNNER: GCP REGION&gt;
Example: --region=us-east-1
--appName=&lt;OPTIONAL: DATAFLOW JOB NAME PREFIX&gt;
Example: --appName=my_job&#34;</code></pre></div></div>Then in the code it is possible to access the parameters with arguments using the <code>options.getStagingBucketName()</code> command.</p><h3 id=running-test-command-with-pipeline-options>Running test command with Pipeline options</h3><p>To pass Pipeline options via the command line, use <code>-DintegrationTestPipelineOptions</code> in a gradle command as follows:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>./gradlew test --tests nameOfTest
-DintegrationTestPipelineOptions=&#39;[
&#34;--serverName=&lt;SNOWFLAKE SERVER NAME&gt;&#34;,
Example: --serverName=account.region.gcp.snowflakecomputing.com
&#34;--username=&lt;SNOWFLAKE USERNAME&gt;&#34;,
Example: --username=testuser
&#34;--password=&lt;SNOWFLAKE PASSWORD&gt;&#34;,
Example: --password=mypassword
&#34;--schema=&lt;SNOWFLAKE SCHEMA&gt;&#34;,
Example: --schema=PUBLIC
&#34;--table=&lt;SNOWFLAKE TABLE IN DATABASE&gt;&#34;,
Example: --table=TEST_TABLE
&#34;--database=&lt;SNOWFLAKE DATABASE&gt;&#34;,
Example: --database=TEST_DATABASE
&#34;--storageIntegrationName=&lt;SNOWFLAKE STORAGE INTEGRATION NAME&gt;&#34;,
Example: --storageIntegrationName=my_integration
&#34;--stagingBucketName=&lt;GCS OR S3 BUCKET&gt;&#34;,
Example: --stagingBucketName={gs,s3}://bucket
&#34;--externalLocation=&lt;GCS BUCKET URL STARTING WITH GS://&gt;&#34;,
Example: --tempLocation=gs://bucket/temp/
]&#39; --no-build-cache</code></pre></div></div></p><p>Where all parameters are starting with “&ndash;”, they are surrounded with double quotation and separated with comma:</p><ul><li><p><code>--serverName=&lt;SNOWFLAKE SERVER NAME></code></p><ul><li>Specifies the full name of your account (provided by Snowflake). Note that your full account name might include additional segments that identify the region and cloud platform where your account is hosted.</li><li>Example: <code>--serverName=xy12345.eu-west-1.gcp..snowflakecomputing.com</code></li></ul></li><li><p><code>--username=&lt;SNOWFLAKE USERNAME></code></p><ul><li>Specifies the login name of the user.</li><li>Example: <code>--username=my_username</code></li></ul></li><li><p><code>--password=&lt;SNOWFLAKE PASSWORD></code></p><ul><li>Specifies the password for the specified user.</li><li>Example: <code>--password=my_secret</code></li></ul></li><li><p><code>--schema=&lt;SNOWFLAKE SCHEMA></code></p><ul><li>Specifies the schema to use for the specified database once connected. The specified schema should be an existing schema for which the specified user’s role has privileges.</li><li>Example: <code>--schema=PUBLIC</code></li></ul></li><li><p><code>--table=&lt;SNOWFLAKE TABLE IN DATABASE></code></p><ul><li>Example: <code>--table=MY_TABLE</code></li></ul></li><li><p><code>--database=&lt;SNOWFLAKE DATABASE></code></p><ul><li>Specifies the database to use once connected. The specified database should be an existing database for which the specified user’s role has privileges.</li><li>Example: <code>--database=MY_DATABASE</code></li></ul></li><li><p><code>--storageIntegrationName=&lt;SNOWFLAKE STORAGE INTEGRATION NAME></code></p><ul><li>Name of storage integration created in <a href=https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html>Snowflake</a> for a cloud storage of choice.</li><li>Example: <code>--storageIntegrationName=my_google_integration</code></li></ul></li></ul><h2 id=running-pipelines-on-dataflow>Running pipelines on Dataflow</h2><p>By default, pipelines are run on <a href=/documentation/runners/direct/>Direct Runner</a> on your local machine. To run a pipeline on <a href=https://cloud.google.com/dataflow/>Google Dataflow</a>, you must provide the following Pipeline options:</p><ul><li><p><code>--runner=DataflowRunner</code></p><ul><li>The Dataflow’s specific runner.</li></ul></li><li><p><code>--project=&lt;GCS PROJECT></code></p><ul><li>Name of the Google Cloud Platform project.</li></ul></li><li><p><code>--stagingBucketName=&lt;GCS OR S3 BUCKET></code></p><ul><li>Google Cloud Services bucket or AWS S3 bucket where the Beam files will be staged.</li></ul></li><li><p><code>--maxNumWorkers=5</code></p><ul><li>(optional) Maximum number of workers.</li></ul></li><li><p><code>--appName=&lt;JOB NAME></code></p><ul><li>(optional) Prefix for the job name in the Dataflow Dashboard.</li></ul></li></ul><p>More pipeline options for Dataflow can be found <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.html>here</a>.</p><p><strong>Note</strong>: To properly authenticate with Google Cloud, please use <a href=https://cloud.google.com/sdk/gcloud/>gcloud</a> or follow the <a href=https://cloud.google.com/docs/authentication/>Google Cloud documentation</a>.</p><p><strong>Important</strong>: Please acknowledge <a href=https://cloud.google.com/dataflow/pricing>Google Dataflow pricing</a></p><h3 id=running-pipeline-templates-on-dataflow>Running pipeline templates on Dataflow</h3><p>Google Dataflow is supporting <a href=https://cloud.google.com/dataflow/docs/guides/templates/overview>template</a> creation which means staging pipelines on Cloud Storage and running them with ability to pass runtime parameters that are only available during pipeline execution.</p><p>The process of creating own Dataflow template is following</p><ol><li>Create your own pipeline.</li><li>Create <a href=https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#creating-and-staging-templates>Dataflow template</a> with checking which options SnowflakeIO is supporting at runtime.</li><li>Run a Dataflow template using <a href=https://cloud.google.com/dataflow/docs/guides/templates/running-templates#using-the-cloud-console>Cloud Console</a>, <a href=https://cloud.google.com/dataflow/docs/guides/templates/running-templates#using-the-rest-api>REST API</a> or <a href=https://cloud.google.com/dataflow/docs/guides/templates/running-templates#using-gcloud>gcloud</a>.</li></ol><p>Currently, SnowflakeIO supports following options at runtime:</p><ul><li><p><code>--serverName</code> Full server name with account, zone and domain.</p></li><li><p><code>--username</code> Required for username/password and Private Key authentication.</p></li><li><p><code>--password</code> Required for username/password authentication only.</p></li><li><p><code>--rawPrivateKey</code> Private Key file. Required for Private Key authentication only.</p></li><li><p><code>--privateKeyPassphrase</code> Private Key&rsquo;s passphrase. Required for Private Key authentication only.</p></li><li><p><code>--stagingBucketName</code> external bucket path ending with <code>/</code>. I.e. <code>{gs,s3}://bucket/</code>. Sub-directories are allowed.</p></li><li><p><code>--storageIntegrationName</code> Storage integration name.</p></li><li><p><code>--warehouse</code> Warehouse to use. Optional.</p></li><li><p><code>--database</code> Database name to connect to. Optional.</p></li><li><p><code>--schema</code> Schema to use. Optional.</p></li><li><p><code>--table</code> Table to use. Optional. Note: table is not in default pipeline options.</p></li><li><p><code>--query</code> Query to use. Optional. Note: query is not in default pipeline options.</p></li><li><p><code>--role</code> Role to use. Optional.</p></li><li><p><code>--snowPipe</code> SnowPipe name. Optional.</p></li></ul><p>Currently, SnowflakeIO <strong>doesn&rsquo;t support</strong> following options at runtime:</p><ul><li><p><code>--url</code> Snowflake&rsquo;s JDBC-like url including account name and region without any parameters.</p></li><li><p><code>--oauthToken</code> Required for OAuth authentication only.</p></li><li><p><code>--privateKeyPath</code> Path to Private Key file. Required for Private Key authentication only.</p></li><li><p><code>--authenticator</code> Authenticator to use. Optional.</p></li><li><p><code>--portNumber</code> Port number. Optional.</p></li><li><p><code>--loginTimeout</code> Login timeout. Optional.</p></li></ul><h2 id=writing-to-snowflake-tables>Writing to Snowflake tables</h2><p>One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user&rsquo;s <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/values/PCollection.html>PCollection</a> to your Snowflake database.</p><h3 id=batch-write-from-a-bounded-source>Batch write (from a bounded source)</h3><p>The basic .<code>write()</code> operation usage is as follows:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>data.apply(
SnowflakeIO.&lt;type&gt;write()
.withDataSourceConfiguration(dc)
.to(&#34;MY_TABLE&#34;)
.withStagingBucketName(&#34;BUCKET&#34;)
.withStorageIntegrationName(&#34;STORAGE INTEGRATION NAME&#34;)
.withUserDataMapper(mapper)
)</code></pre></div></div>Replace type with the data type of the <code>PCollection</code> object to write; for example, <code>SnowflakeIO.&lt;String></code> for an input <code>PCollection</code> of Strings.</p><p>All the below parameters are required:</p><ul><li><p><code>.withDataSourceConfiguration()</code> Accepts a DatasourceConfiguration object.</p></li><li><p><code>.to()</code> Accepts the target Snowflake table name.</p></li><li><p><code>.withStagingBucketName()</code> Accepts a cloud bucket path ended with slash.
-Example: <code>.withStagingBucketName("{gs,s3}://bucket/my/dir/")</code></p></li><li><p><code>.withStorageIntegrationName()</code> Accepts a name of a Snowflake storage integration object created according to Snowflake documentation. Examples:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>CREATE OR REPLACE STORAGE INTEGRATION &#34;test_integration&#34;
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = (&#39;gcs://bucket/&#39;);</code></pre></div></div><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>CREATE STORAGE INTEGRATION &#34;test_integration&#34;
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = &#39;&lt;ARN ROLE NAME&gt;&#39;
STORAGE_ALLOWED_LOCATIONS = (&#39;s3://bucket/&#39;)</code></pre></div></div>Then:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>.withStorageIntegrationName(&#34;test_integration&#34;)</code></pre></div></div></p></li><li><p><code>.withUserDataMapper()</code> Accepts the UserDataMapper function that will map a user&rsquo;s PCollection to an array of String values <code>(String[])</code>.</p></li></ul><p><strong>Note</strong>:
SnowflakeIO uses <code>COPY</code> statements behind the scenes to write (using <a href=https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html>COPY to table</a>). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path.</p><p><strong>Optional</strong> for batching:</p><ul><li><code>.withQuotationMark()</code><ul><li>Default value: <code></code> (single quotation mark).</li><li>Accepts String with one character. It will surround all text (String) fields saved to CSV. It should be one of the accepted characters by <a href=https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html>Snowflake’s</a> <a href=https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html>FIELD_OPTIONALLY_ENCLOSED_BY</a> parameter (double quotation mark, single quotation mark or none).</li><li>Example: <code>.withQuotationMark("'")</code></li></ul></li></ul><h3 id=streaming-write-from-unbounded-source>Streaming write (from unbounded source)</h3><p>It is required to create a <a href=https://docs.snowflake.com/en/user-guide/data-load-snowpipe.html>SnowPipe</a> in the Snowflake console. SnowPipe should use the same integration and the same bucket as specified by <code>.withStagingBucketName</code> and <code>.withStorageIntegrationName</code> methods. The write operation might look as follows:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>data.apply(
SnowflakeIO.&lt;type&gt;write()
.withStagingBucketName(&#34;BUCKET&#34;)
.withStorageIntegrationName(&#34;STORAGE INTEGRATION NAME&#34;)
.withDataSourceConfiguration(dc)
.withUserDataMapper(mapper)
.withSnowPipe(&#34;MY_SNOW_PIPE&#34;)
.withFlushTimeLimit(Duration.millis(time))
.withFlushRowLimit(rowsNumber)
.withShardsNumber(shardsNumber)
)</code></pre></div></div></p><h4 id=parameters>Parameters</h4><p><strong>Required</strong> for streaming:</p><ul><li><p><code>.withDataSourceConfiguration()</code></p><ul><li>Accepts a DatasourceConfiguration object.</li></ul></li><li><p><code>.to()</code></p><ul><li>Accepts the target Snowflake table name.</li><li>Example: <code>.to("MY_TABLE")</code></li></ul></li><li><p><code>.withStagingBucketName()</code></p><ul><li>Accepts a cloud bucket path ended with slash.</li><li>Example: <code>.withStagingBucketName("{gs,s3}://bucket/my/dir/")</code></li></ul></li><li><p><code>.withStorageIntegrationName()</code></p><ul><li>Accepts a name of a Snowflake storage integration object created according to Snowflake documentation.</li><li>Example:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>CREATE OR REPLACE STORAGE INTEGRATION &#34;test_integration&#34;
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = (&#39;gcs://bucket/&#39;);</code></pre></div></div><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>CREATE STORAGE INTEGRATION &#34;test_integration&#34;
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = &#39;&lt;ARN ROLE NAME&gt;&#39;
STORAGE_ALLOWED_LOCATIONS = (&#39;s3://bucket/&#39;)</code></pre></div></div>Then:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>.withStorageIntegrationName(&#34;test_integration&#34;)</code></pre></div></div></li></ul></li><li><p><code>.withSnowPipe()</code></p><ul><li><p>Accepts the target SnowPipe name. <code>.withSnowPipe()</code> accepts the exact name of snowpipe.
Example:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>CREATE OR REPLACE PIPE &#34;test_database&#34;.&#34;public&#34;.&#34;test_gcs_pipe&#34;
AS COPY INTO stream_table from @streamstage;</code></pre></div></div></p></li><li><p>Then:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>.withSnowPipe(&#34;test_gcs_pipe&#34;)</code></pre></div></div></p></li></ul></li></ul><p><strong>Note</strong>: this is important to provide <strong>schema</strong> and <strong>database</strong> names.</p><ul><li><code>.withUserDataMapper()</code><ul><li>Accepts the <a href=/documentation/io/built-in/snowflake/#userdatamapper-function>UserDataMapper</a> function that will map a user&rsquo;s PCollection to an array of String values <code>(String[]).</code></li></ul></li></ul><p><strong>Note</strong>:</p><p>As mentioned before SnowflakeIO uses <a href=https://docs.snowflake.com/en/user-guide/data-load-snowpipe.html>SnowPipe REST calls</a>
behind the scenes for writing from unbounded sources. StagingBucketName will be used to save CSV files which will end up in Snowflake.
SnowflakeIO is not going to delete created CSV files from path under the “stagingBucketName” either during or after finishing streaming.</p><p><strong>Optional</strong> for streaming:</p><ul><li><p><code>.withFlushTimeLimit()</code></p><ul><li>Default value: 30 seconds</li><li>Accepts Duration objects with the specified time after each the streaming write will be repeated</li><li>Example: <code>.withFlushTimeLimit(Duration.millis(180000))</code></li></ul></li><li><p><code>.withFlushRowLimit()</code></p><ul><li>Default value: 10,000 rows</li><li>Limit of rows written to each staged file</li><li>Example: <code>.withFlushRowLimit(500000)</code></li></ul></li><li><p><code>.withShardNumber()</code></p><ul><li>Default value: 1 shard</li><li>Number of files that will be saved in every flush (for purposes of parallel write).</li><li>Example: <code>.withShardNumber(5)</code></li></ul></li><li><p><code>.withQuotationMark()</code></p><ul><li>Default value: <code></code> (single quotation mark).</li><li>Accepts String with one character. It will surround all text (String) fields saved to CSV. It should be one of the accepted characters by <a href=https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html>Snowflake’s</a> <a href=https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html>FIELD_OPTIONALLY_ENCLOSED_BY</a> parameter (double quotation mark, single quotation mark or none). Example: .withQuotationMark("") (no quotation marks)</li></ul></li><li><p><code>.withDebugMode()</code></p><ul><li>Accepts:<ul><li><code>SnowflakeIO.StreamingLogLevel.INFO</code> - shows whole info about loaded files</li><li><code>SnowflakeIO.StreamingLogLevel.ERROR</code> - shows only errors.</li></ul></li><li>Shows logs about streamed files to Snowflake similarly to <a href=https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis.html#endpoint-insertreport>insertReport</a>. Enabling debug mode may influence performance.</li><li>Example: <code>.withDebugMode(SnowflakeIO.StreamingLogLevel.INFO)</code></li></ul></li></ul><p><strong>Important notice</strong>:</p><ol><li>Streaming accepts only <strong>key pair authentication</strong>. For details, see: <a href=https://github.com/apache/beam/issues/21287>Issue 21287</a>.</li><li>The role parameter configured in <code>SnowflakeIO.DataSourceConfiguration</code> object is ignored for streaming writing. For details, see: <a href=https://github.com/apache/beam/issues/21365>Issue 21365</a></li></ol><h4 id=flush-time-duration--number-of-rows>Flush time: duration & number of rows</h4><p>Duration: streaming write will write periodically files on stage according to time duration specified in flush time limit (for example. every 1 minute).</p><p>Number of rows: files staged for write will have number of rows specified in flush row limit unless the flush time limit will be reached (for example if the limit is 1000 rows and buffer collected 99 rows and the 1-minute flush time passes, the rows will be sent to SnowPipe for insertion).</p><p>Size of staged files will depend on the rows size and used compression (GZIP).</p><h3 id=userdatamapper-function>UserDataMapper function</h3><p>The <code>UserDataMapper</code> function is required to map data from a <code>PCollection</code> to an array of String values before the <code>write()</code> operation saves the data to temporary <code>.csv</code> files. For example:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>public static SnowflakeIO.UserDataMapper&lt;Long&gt; getCsvMapper() {
return (SnowflakeIO.UserDataMapper&lt;Long&gt;) recordLine -&gt; new String[] {recordLine.toString()};
}</code></pre></div></div></p><h3 id=additional-write-options>Additional write options</h3><h4 id=transformation-query>Transformation query</h4><p>The <code>.withQueryTransformation()</code> option for the <code>write()</code> operation accepts a SQL query as a String value, which will be performed while transfering data staged in CSV files directly to the target Snowflake table. For information about the transformation SQL syntax, see the <a href=https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html#transformation-parameters>Snowflake Documentation</a>.</p><p>Usage:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>String query = &#34;SELECT t.$1 from YOUR_TABLE;&#34;;
data.apply(
SnowflakeIO.&lt;~&gt;write()
.withDataSourceConfiguration(dc)
.to(&#34;MY_TABLE&#34;)
.withStagingBucketName(&#34;BUCKET&#34;)
.withStorageIntegrationName(&#34;STORAGE INTEGRATION NAME&#34;)
.withUserDataMapper(mapper)
.withQueryTransformation(query)
)</code></pre></div></div></p><h4 id=write-disposition>Write disposition</h4><p>Define the write behaviour based on the table where data will be written to by specifying the <code>.withWriteDisposition(...)</code> option for the <code>write()</code> operation. The following values are supported:</p><ul><li><p><code>APPEND</code> - Default behaviour. Written data is added to the existing rows in the table,</p></li><li><p><code>EMPTY</code> - The target table must be empty; otherwise, the write operation fails,</p></li><li><p><code>TRUNCATE</code> - The write operation deletes all rows from the target table before writing to it.</p></li></ul><p>Example of usage:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>data.apply(
SnowflakeIO.&lt;~&gt;write()
.withDataSourceConfiguration(dc)
.to(&#34;MY_TABLE&#34;)
.withStagingBucketName(&#34;BUCKET&#34;)
.withStorageIntegrationName(&#34;STORAGE INTEGRATION NAME&#34;)
.withUserDataMapper(mapper)
.withWriteDisposition(TRUNCATE)
)</code></pre></div></div></p><h4 id=create-disposition>Create disposition</h4><p>The <code>.withCreateDisposition()</code> option defines the behavior of the write operation if the target table does not exist . The following values are supported:</p><ul><li><p><code>CREATE_IF_NEEDED</code> - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the <code>.withTableSchema()</code> option.</p></li><li><p><code>CREATE_NEVER</code> - The write operation fails if the target table does not exist.</p></li></ul><p>Usage:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>data.apply(
SnowflakeIO.&lt;~&gt;write()
.withDataSourceConfiguration(dc)
.to(&#34;MY_TABLE&#34;)
.withStagingBucketName(&#34;BUCKET&#34;)
.withStorageIntegrationName(&#34;STORAGE INTEGRATION NAME&#34;)
.withUserDataMapper(mapper)
.withCreateDisposition(CREATE_NEVER)
)</code></pre></div></div></p><h4 id=table-schema-disposition>Table schema disposition</h4><p>When the <code>.withCreateDisposition()</code> option is set to <code>CREATE_IF_NEEDED</code>, the <code>.withTableSchema()</code> option enables specifying the schema for the created target table.
A table schema is a list of <code>SnowflakeColumn</code> objects with name and type corresponding to column type for each column in the table.</p><p>Usage:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>SnowflakeTableSchema tableSchema =
new SnowflakeTableSchema(
SnowflakeColumn.of(&#34;my_date&#34;, new SnowflakeDate(), true),
new SnowflakeColumn(&#34;id&#34;, new SnowflakeNumber()),
SnowflakeColumn.of(&#34;name&#34;, new SnowflakeText(), true));
data.apply(
SnowflakeIO.&lt;~&gt;write()
.withDataSourceConfiguration(dc)
.to(&#34;MY_TABLE&#34;)
.withStagingBucketName(&#34;BUCKET&#34;)
.withStorageIntegrationName(&#34;STORAGE INTEGRATION NAME&#34;)
.withUserDataMapper(mapper)
.withTableSchema(tableSchema)
)</code></pre></div></div></p><h2 id=reading-from-snowflake>Reading from Snowflake</h2><p>One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/values/PCollection.html>PCollection</a> of user-defined data type.</p><h3 id=general-usage-1>General usage</h3><p>The basic <code>.read()</code> operation usage:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>PCollection&lt;USER_DATA_TYPE&gt; items = pipeline.apply(
SnowflakeIO.&lt;USER_DATA_TYPE&gt;read()
.withDataSourceConfiguration(dc)
.fromTable(&#34;MY_TABLE&#34;) // or .fromQuery(&#34;QUERY&#34;)
.withStagingBucketName(&#34;BUCKET&#34;)
.withStorageIntegrationName(&#34;STORAGE INTEGRATION NAME&#34;)
.withCsvMapper(mapper)
.withCoder(coder));
)</code></pre></div></div>Where all below parameters are required:</p><ul><li><p><code>.withDataSourceConfiguration(...)</code></p><ul><li>Accepts a DataSourceConfiguration object.</li></ul></li><li><p><code>.fromTable(...) or .fromQuery(...)</code></p><ul><li>Specifies a Snowflake table name or custom SQL query.</li></ul></li><li><p><code>.withStagingBucketName()</code></p><ul><li>Accepts a cloud bucket name.</li></ul></li><li><p><code>.withStorageIntegrationName()</code></p></li><li><p>Accepts a name of a Snowflake storage integration object created according to Snowflake documentation. Example:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>CREATE OR REPLACE STORAGE INTEGRATION test_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = (&#39;gcs://bucket/&#39;);</code></pre></div></div><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>CREATE STORAGE INTEGRATION test_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = &#39;&lt;ARN ROLE NAME&gt;&#39;
STORAGE_ALLOWED_LOCATIONS = (&#39;s3://bucket/&#39;)</code></pre></div></div>Then:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>.withStorageIntegrationName(test_integration)</code></pre></div></div></p></li><li><p><code>.withCsvMapper(mapper)</code></p><ul><li>Accepts a <a href=/documentation/io/built-in/snowflake/#csvmapper>CSVMapper</a> instance for mapping String[] to USER_DATA_TYPE.</li></ul></li><li><p><code>.withCoder(coder)</code></p><ul><li>Accepts the <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/coders/Coder.html>Coder</a> for USER_DATA_TYPE.</li></ul></li></ul><p><strong>Note</strong>:
SnowflakeIO uses <code>COPY</code> statements behind the scenes to read (using <a href=https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html>COPY to location</a>) files staged in cloud storage.StagingBucketName will be used as a temporary location for storing CSV files. Those temporary directories will be named <code>sf_copy_csv_DATE_TIME_RANDOMSUFFIX</code> and they will be removed automatically once Read operation finishes.</p><h3 id=csvmapper>CSVMapper</h3><p>SnowflakeIO uses a <a href=https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html>COPY INTO <location></a>statement to move data from a Snowflake table to GCS/S3 as CSV files. These files are then downloaded via <a href=https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/io/FileIO.html>FileIO</a> and processed line by line. Each line is split into an array of Strings using the <a href=https://opencsv.sourceforge.net/>OpenCSV</a> library.</p><p>The CSVMapper’s job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom POJO.</p><p>Example implementation of CsvMapper for GenericRecord:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>static SnowflakeIO.CsvMapper&lt;GenericRecord&gt; getCsvMapper() {
return (SnowflakeIO.CsvMapper&lt;GenericRecord&gt;)
parts -&gt; {
return new GenericRecordBuilder(PARQUET_SCHEMA)
.set(&#34;ID&#34;, Long.valueOf(parts[0]))
.set(&#34;NAME&#34;, parts[1])
[...]
.build();
};
}</code></pre></div></div></p><h2 id=using-snowflakeio-with-aws-s3>Using SnowflakeIO with AWS S3</h2><p>To be able to use AWS S3 bucket as <code>stagingBucketName</code> is required to:</p><ol><li>Create <code>PipelineOptions</code> interface which is <a href=/documentation/io/built-in/snowflake/#extending-pipeline-options>extending</a> <code>SnowflakePipelineOptions</code> and <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/options/S3Options.html>S3Options</a>
with <code>AwsAccessKey</code> and <code>AwsSecretKey</code> options. Example:</li></ol><p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>public interface AwsPipelineOptions extends SnowflakePipelineOptions, S3Options {
@Description(&#34;AWS Access Key&#34;)
@Default.String(&#34;access_key&#34;)
String getAwsAccessKey();
void setAwsAccessKey(String awsAccessKey);
@Description(&#34;AWS secret key&#34;)
@Default.String(&#34;secret_key&#34;)
String getAwsSecretKey();
void setAwsSecretKey(String awsSecretKey);
}</code></pre></div></div>2. Set <code>AwsCredentialsProvider</code> option by using <code>AwsAccessKey</code> and <code>AwsSecretKey</code> options.</p><p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>options.setAwsCredentialsProvider(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey())
)
);</code></pre></div></div>3. Create pipeline</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>Pipeline p = Pipeline.create(options);</code></pre></div></div><p><strong>Note</strong>: Remember to set <code>awsRegion</code> from <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/options/S3Options.html>S3Options</a>.</p><h2 id=using-snowflakeio-in-python-sdk>Using SnowflakeIO in Python SDK</h2><h3 id=intro>Intro</h3><p>Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
cross-language which is part of <a href=/roadmap/portability/>Portability Framework Roadmap</a> which aims to provide full interoperability
across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).</p><p>For more information about cross-language please see <a href=/roadmap/connectors-multi-sdk/>multi sdk efforts</a>
and <a href=/roadmap/connectors-multi-sdk/#cross-language-transforms-api-and-expansion-service>Cross-language transforms API and expansion service</a> articles.</p><p>Additional resources:</p><ul><li><a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/snowflake.py>SnowflakeIO source code</a></li><li><a href=https://beam.apache.org/releases/pydoc/2.56.0/apache_beam.io.snowflake.html>SnowflakeIO Pydoc</a></li><li><a href=https://docs.snowflake.com/en>Snowflake documentation</a></li></ul><h3 id=reading-from-snowflake-1>Reading from Snowflake</h3><p>One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.pvalue.html#apache_beam.pvalue.PCollection>PCollection</a> of user-defined data type.</p><h4 id=general-usage-2>General usage</h4><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>OPTIONS = [&#34;--runner=FlinkRunner&#34;]
with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
(p
| ReadFromSnowflake(...)
| &lt;FURTHER TRANSFORMS&gt;)</code></pre></div></div><h4 id=required-parameters>Required parameters</h4><ul><li><p><code>server_name</code> Full Snowflake server name with an account, zone, and domain.</p></li><li><p><code>schema</code> Name of the Snowflake schema in the database to use.</p></li><li><p><code>database</code> Name of the Snowflake database to use.</p></li><li><p><code>staging_bucket_name</code> Name of the Google Cloud Storage bucket or AWS S3 bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named <code>sf_copy_csv_DATE_TIME_RANDOMSUFFIX</code> and they will be removed automatically once Read operation finishes.</p></li><li><p><code>storage_integration_name</code> Is the name of a Snowflake storage integration object created according to <a href=https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html>Snowflake documentation</a>.</p></li><li><p><code>csv_mapper</code> Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a <a href=https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html>COPY INTO <location></a>statement to move data from a Snowflake table to GCS/S3 as CSV files. These files are then downloaded via <a href=https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/io/FileIO.html>FileIO</a> and processed line by line. Each line is split into an array of Strings using the <a href=https://opencsv.sourceforge.net/>OpenCSV</a> library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
Example:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>def csv_mapper(strings_array):
return User(strings_array[0], int(strings_array[1])))</code></pre></div></div></p></li><li><p><code>table</code> or <code>query</code> Specifies a Snowflake table name or custom SQL query</p></li></ul><h4 id=authentication-parameters>Authentication parameters</h4><p>It’s required to pass one of the following combinations of valid parameters for authentication:</p><ul><li><p><code>username</code> and <code>password</code> Specifies username and password for username/password authentication method.</p></li><li><p><code>private_key_path</code> and <code>private_key_passphrase</code> Specifies a path to private key and passphrase for key/pair authentication method.</p></li><li><p><code>raw_private_key</code> and <code>private_key_passphrase</code> Specifies a private key and passphrase for key/pair authentication method.</p></li><li><p><code>o_auth_token</code> Specifies access token for OAuth authentication method.</p></li></ul><h4 id=additional-parameters>Additional parameters</h4><ul><li><p><code>role</code> specifies Snowflake role. If not specified the user&rsquo;s default will be used.</p></li><li><p><code>warehouse</code> specifies Snowflake warehouse name. If not specified the user&rsquo;s default will be used.</p></li><li><p><code>expansion_service</code> specifies URL of expansion service.</p></li></ul><h3 id=writing-to-snowflake>Writing to Snowflake</h3><p>One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user&rsquo;s <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.pvalue.html#apache_beam.pvalue.PCollection>PCollection</a> to your Snowflake database.</p><h4 id=general-usage-3>General usage</h4><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>OPTIONS = [&#34;--runner=FlinkRunner&#34;]
with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
(p
| &lt;SOURCE OF DATA&gt;
| WriteToSnowflake(
server_name=&lt;SNOWFLAKE SERVER NAME&gt;,
username=&lt;SNOWFLAKE USERNAME&gt;,
password=&lt;SNOWFLAKE PASSWORD&gt;,
o_auth_token=&lt;OAUTH TOKEN&gt;,
private_key_path=&lt;PATH TO P8 FILE&gt;,
raw_private_key=&lt;PRIVATE_KEY&gt;
private_key_passphrase=&lt;PASSWORD FOR KEY&gt;,
schema=&lt;SNOWFLAKE SCHEMA&gt;,
database=&lt;SNOWFLAKE DATABASE&gt;,
staging_bucket_name=&lt;GCS OR S3 BUCKET&gt;,
storage_integration_name=&lt;SNOWFLAKE STORAGE INTEGRATION NAME&gt;,
create_disposition=&lt;CREATE DISPOSITION&gt;,
write_disposition=&lt;WRITE DISPOSITION&gt;,
table_schema=&lt;SNOWFLAKE TABLE SCHEMA&gt;,
user_data_mapper=&lt;USER DATA MAPPER FUNCTION&gt;,
table=&lt;SNOWFLAKE TABLE&gt;,
query=&lt;IF NOT TABLE THEN QUERY&gt;,
role=&lt;SNOWFLAKE ROLE&gt;,
warehouse=&lt;SNOWFLAKE WAREHOUSE&gt;,
expansion_service=&lt;EXPANSION SERVICE ADDRESS&gt;))</code></pre></div></div><h4 id=required-parameters-1>Required parameters</h4><ul><li><p><code>server_name</code> Full Snowflake server name with account, zone and domain.</p></li><li><p><code>schema</code> Name of the Snowflake schema in the database to use.</p></li><li><p><code>database</code> Name of the Snowflake database to use.</p></li><li><p><code>staging_bucket_name</code> Path to Google Cloud Storage bucket or AWS S3 bucket ended with slash. Bucket will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under “staging_bucket_name” path.</p></li><li><p><code>storage_integration_name</code> Is the name of a Snowflake storage integration object created according to <a href=https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html>Snowflake documentation</a>.</p></li><li><p><code>user_data_mapper</code> Specifies a function which maps data from a PCollection to an array of String values before the write operation saves the data to temporary .csv files.
Example:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>def user_data_mapper(user):
return [user.name, str(user.age)]</code></pre></div></div></p></li><li><p><code>table</code> or <code>query</code> Specifies a Snowflake table name or custom SQL query</p></li></ul><h4 id=authentication-parameters-1>Authentication parameters</h4><p>It’s required to pass one of the following combination of valid parameters for authentication:</p><ul><li><p><code>username</code> and <code>password</code> Specifies username/password authentication method.</p></li><li><p><code>private_key_path</code> and <code>private_key_passphrase</code> Specifies a path to private key and passphrase for key/pair authentication method.</p></li><li><p><code>raw_private_key</code> and <code>private_key_passphrase</code> Specifies a private key and passphrase for key/pair authentication method.</p></li><li><p><code>o_auth_token</code> Specifies access token for OAuth authentication method.</p></li></ul><h4 id=additional-parameters-1>Additional parameters</h4><ul><li><p><code>role</code> specifies Snowflake role. If not specified the user&rsquo;s default will be used.</p></li><li><p><code>warehouse</code> specifies Snowflake warehouse name. If not specified the user&rsquo;s default will be used.</p></li><li><p><code>create_disposition</code> Defines the behaviour of the write operation if the target table does not exist. The following values are supported:</p><ul><li><code>CREATE_IF_NEEDED</code> - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the table_schema parameter.</li><li><code>CREATE_NEVER</code> - The write operation fails if the target table does not exist.</li></ul></li><li><p><code>write_disposition</code> Defines the write behaviour based on the table where data will be written to. The following values are supported:</p><ul><li><code>APPEND</code> - Default behaviour. Written data is added to the existing rows in the table,</li><li><code>EMPTY</code> - The target table must be empty; otherwise, the write operation fails,</li><li><code>TRUNCATE</code> - The write operation deletes all rows from the target table before writing to it.</li></ul></li><li><p><code>table_schema</code> When the <code>create_disposition</code> parameter is set to CREATE_IF_NEEDED, the table_schema parameter enables specifying the schema for the created target table. A table schema is a JSON array with the following structure:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>{&#34;schema&#34;: [
{
&#34;dataType&#34;:{&#34;type&#34;:&#34;&lt;COLUMN DATA TYPE&gt;&#34;},
&#34;name&#34;:&#34;&lt;COLUMN NAME&gt; &#34;,
&#34;nullable&#34;: &lt;NULLABLE&gt;
},
...
]}</code></pre></div></div>All supported data types:<div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>{&#34;type&#34;:&#34;date&#34;},
{&#34;type&#34;:&#34;datetime&#34;},
{&#34;type&#34;:&#34;time&#34;},
{&#34;type&#34;:&#34;timestamp&#34;},
{&#34;type&#34;:&#34;timestamp_ltz&#34;},
{&#34;type&#34;:&#34;timestamp_ntz&#34;},
{&#34;type&#34;:&#34;timestamp_tz&#34;},
{&#34;type&#34;:&#34;boolean&#34;},
{&#34;type&#34;:&#34;decimal&#34;,&#34;precision&#34;:38,&#34;scale&#34;:1},
{&#34;type&#34;:&#34;double&#34;},
{&#34;type&#34;:&#34;float&#34;},
{&#34;type&#34;:&#34;integer&#34;,&#34;precision&#34;:38,&#34;scale&#34;:0},
{&#34;type&#34;:&#34;number&#34;,&#34;precision&#34;:38,&#34;scale&#34;:1},
{&#34;type&#34;:&#34;numeric&#34;,&#34;precision&#34;:38,&#34;scale&#34;:2},
{&#34;type&#34;:&#34;real&#34;},
{&#34;type&#34;:&#34;array&#34;},
{&#34;type&#34;:&#34;object&#34;},
{&#34;type&#34;:&#34;variant&#34;},
{&#34;type&#34;:&#34;binary&#34;,&#34;size&#34;:null},
{&#34;type&#34;:&#34;char&#34;,&#34;length&#34;:1},
{&#34;type&#34;:&#34;string&#34;,&#34;length&#34;:null},
{&#34;type&#34;:&#34;text&#34;,&#34;length&#34;:null},
{&#34;type&#34;:&#34;varbinary&#34;,&#34;size&#34;:null},
{&#34;type&#34;:&#34;varchar&#34;,&#34;length&#34;:100}]</code></pre></div></div>You can read about Snowflake data types at <a href=https://docs.snowflake.com/en/sql-reference/data-types.html>Snowflake data types</a>.</p></li><li><p><code>expansion_service</code> Specifies URL of expansion service.</p></li></ul><h2 id=limitations>Limitations</h2><p>SnowflakeIO currently has the following limitations.</p><ol><li><p>Streaming writing supports only pair key authentication. For details, see: <a href=https://github.com/apache/beam/issues/21287>Issue 21287</a>.</p></li><li><p>The role parameter configured in <code>SnowflakeIO.DataSourceConfiguration</code> object is ignored for streaming writing. For details, see: <a href=https://github.com/apache/beam/issues/21365>Issue 21365</a></p></li></ol><div class=feedback><p class=update>Last updated on 2024/05/03</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>