blob: a00d6f3b267089b921e966f80cdc28d219551436 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.io.gcp.bigquery_file_loads &mdash; Apache Beam 2.47.0 documentation</title>
<script type="text/javascript" src="../../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../../" src="../../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../../genindex.html" />
<link rel="search" title="Search" href="../../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.47.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.io.gcp.bigquery_file_loads</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.io.gcp.bigquery_file_loads</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd">Functionality to perform file loads into BigQuery for Batch and Streaming</span>
<span class="sd">pipelines.</span>
<span class="sd">This source is able to work around BigQuery load quotas and limitations. When</span>
<span class="sd">destinations are dynamic, or when data for a single job is too large, the data</span>
<span class="sd">will be split into multiple jobs.</span>
<span class="sd">NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="c1"># pytype: skip-file</span>
<span class="kn">import</span> <span class="nn">hashlib</span>
<span class="kn">import</span> <span class="nn">io</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">random</span>
<span class="kn">import</span> <span class="nn">time</span>
<span class="kn">import</span> <span class="nn">uuid</span>
<span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">pvalue</span>
<span class="kn">from</span> <span class="nn">apache_beam.io</span> <span class="kn">import</span> <span class="n">filesystems</span> <span class="k">as</span> <span class="n">fs</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp</span> <span class="kn">import</span> <span class="n">bigquery_tools</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.bigquery_io_metadata</span> <span class="kn">import</span> <span class="n">create_bigquery_io_metadata</span>
<span class="kn">from</span> <span class="nn">apache_beam.options</span> <span class="kn">import</span> <span class="n">value_provider</span> <span class="k">as</span> <span class="n">vp</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="kn">import</span> <span class="n">GoogleCloudOptions</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">trigger</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="kn">import</span> <span class="n">DisplayDataItem</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.util</span> <span class="kn">import</span> <span class="n">GroupIntoBatches</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">GlobalWindows</span>
<span class="c1"># Protect against environments where bigquery library is not available.</span>
<span class="c1"># pylint: disable=wrong-import-order, wrong-import-position</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">apitools.base.py.exceptions</span> <span class="kn">import</span> <span class="n">HttpError</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="k">pass</span>
<span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>
<span class="n">ONE_TERABYTE</span> <span class="o">=</span> <span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">40</span><span class="p">)</span>
<span class="c1"># The maximum file size for imports is 5TB. We keep our files under that.</span>
<span class="n">_DEFAULT_MAX_FILE_SIZE</span> <span class="o">=</span> <span class="mi">4</span> <span class="o">*</span> <span class="n">ONE_TERABYTE</span>
<span class="n">_DEFAULT_MAX_WRITERS_PER_BUNDLE</span> <span class="o">=</span> <span class="mi">20</span>
<span class="c1"># The maximum size for a single load job is one terabyte</span>
<span class="n">_MAXIMUM_LOAD_SIZE</span> <span class="o">=</span> <span class="mi">15</span> <span class="o">*</span> <span class="n">ONE_TERABYTE</span>
<span class="c1"># Big query only supports up to 10 thousand URIs for a single load job.</span>
<span class="n">_MAXIMUM_SOURCE_URIS</span> <span class="o">=</span> <span class="mi">10</span> <span class="o">*</span> <span class="mi">1000</span>
<span class="c1"># If triggering_frequency is supplied, we will trigger the file write after</span>
<span class="c1"># this many records are written.</span>
<span class="n">_FILE_TRIGGERING_RECORD_COUNT</span> <span class="o">=</span> <span class="mi">500000</span>
<span class="c1"># If using auto-sharding for unbounded data, we batch the records before</span>
<span class="c1"># triggering file write to avoid generating too many small files.</span>
<span class="n">_FILE_TRIGGERING_BATCHING_DURATION_SECS</span> <span class="o">=</span> <span class="mi">1</span>
<span class="c1"># How many seconds we wait before polling a pending job</span>
<span class="n">_SLEEP_DURATION_BETWEEN_POLLS</span> <span class="o">=</span> <span class="mi">10</span>
<span class="k">def</span> <span class="nf">_generate_job_name</span><span class="p">(</span><span class="n">job_name</span><span class="p">,</span> <span class="n">job_type</span><span class="p">,</span> <span class="n">step_name</span><span class="p">):</span>
<span class="k">return</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">generate_bq_job_name</span><span class="p">(</span>
<span class="n">job_name</span><span class="o">=</span><span class="n">job_name</span><span class="p">,</span>
<span class="n">step_id</span><span class="o">=</span><span class="n">step_name</span><span class="p">,</span>
<span class="n">job_type</span><span class="o">=</span><span class="n">job_type</span><span class="p">,</span>
<span class="n">random</span><span class="o">=</span><span class="n">random</span><span class="o">.</span><span class="n">randint</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="mi">1000</span><span class="p">))</span>
<div class="viewcode-block" id="file_prefix_generator"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.file_prefix_generator">[docs]</a><span class="k">def</span> <span class="nf">file_prefix_generator</span><span class="p">(</span>
<span class="n">with_validation</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">pipeline_gcs_location</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">temp_location</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">_generate_file_prefix</span><span class="p">(</span><span class="n">unused_elm</span><span class="p">):</span>
<span class="c1"># If a gcs location is provided to the pipeline, then we shall use that.</span>
<span class="c1"># Otherwise, we shall use the temp_location from pipeline options.</span>
<span class="n">gcs_base</span> <span class="o">=</span> <span class="n">pipeline_gcs_location</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">gcs_base</span><span class="p">:</span>
<span class="n">gcs_base</span> <span class="o">=</span> <span class="n">temp_location</span>
<span class="c1"># This will fail at pipeline execution time, but will fail early, as this</span>
<span class="c1"># step doesn&#39;t have any dependencies (and thus will be one of the first</span>
<span class="c1"># stages to be run).</span>
<span class="k">if</span> <span class="n">with_validation</span> <span class="ow">and</span> <span class="p">(</span><span class="ow">not</span> <span class="n">gcs_base</span> <span class="ow">or</span> <span class="ow">not</span> <span class="n">gcs_base</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;gs://&#39;</span><span class="p">)):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;Invalid GCS location: </span><span class="si">%r</span><span class="s1">.</span><span class="se">\n</span><span class="s1">&#39;</span>
<span class="s1">&#39;Writing to BigQuery with FILE_LOADS method requires a&#39;</span>
<span class="s1">&#39; GCS location to be provided to write files to be loaded&#39;</span>
<span class="s1">&#39; into BigQuery. Please provide a GCS bucket through&#39;</span>
<span class="s1">&#39; custom_gcs_temp_location in the constructor of WriteToBigQuery&#39;</span>
<span class="s1">&#39; or the fallback option --temp_location, or pass&#39;</span>
<span class="s1">&#39; method=&quot;STREAMING_INSERTS&quot; to WriteToBigQuery.&#39;</span> <span class="o">%</span> <span class="n">gcs_base</span><span class="p">)</span>
<span class="n">prefix_uuid</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">()</span>
<span class="k">return</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">gcs_base</span><span class="p">,</span> <span class="s1">&#39;bq_load&#39;</span><span class="p">,</span> <span class="n">prefix_uuid</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_generate_file_prefix</span></div>
<span class="k">def</span> <span class="nf">_make_new_file_writer</span><span class="p">(</span>
<span class="n">file_prefix</span><span class="p">,</span>
<span class="n">destination</span><span class="p">,</span>
<span class="n">file_format</span><span class="p">,</span>
<span class="n">schema</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">schema_side_inputs</span><span class="o">=</span><span class="nb">tuple</span><span class="p">()):</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">get_hashable_destination</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="c1"># Windows does not allow : on filenames. Replacing with underscore.</span>
<span class="c1"># Other disallowed characters are:</span>
<span class="c1"># https://docs.microsoft.com/en-us/windows/desktop/fileio/naming-a-file</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">destination</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;:&#39;</span><span class="p">,</span> <span class="s1">&#39;.&#39;</span><span class="p">)</span>
<span class="n">directory</span> <span class="o">=</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">exists</span><span class="p">(</span><span class="n">directory</span><span class="p">):</span>
<span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">mkdirs</span><span class="p">(</span><span class="n">directory</span><span class="p">)</span>
<span class="n">file_name</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span>
<span class="n">file_path</span> <span class="o">=</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">,</span> <span class="n">file_name</span><span class="p">)</span>
<span class="k">if</span> <span class="n">file_format</span> <span class="o">==</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">FileFormat</span><span class="o">.</span><span class="n">AVRO</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">callable</span><span class="p">(</span><span class="n">schema</span><span class="p">):</span>
<span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span><span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">schema</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span>
<span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="n">writer</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">AvroRowWriter</span><span class="p">(</span>
<span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">create</span><span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="s2">&quot;application/avro&quot;</span><span class="p">),</span> <span class="n">schema</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">file_format</span> <span class="o">==</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">FileFormat</span><span class="o">.</span><span class="n">JSON</span><span class="p">:</span>
<span class="n">writer</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">JsonRowWriter</span><span class="p">(</span>
<span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">create</span><span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="s2">&quot;application/text&quot;</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">((</span>
<span class="s1">&#39;Only AVRO and JSON are supported as intermediate formats for &#39;</span>
<span class="s1">&#39;BigQuery WriteRecordsToFile, got: </span><span class="si">{}</span><span class="s1">.&#39;</span><span class="p">)</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">file_format</span><span class="p">))</span>
<span class="k">return</span> <span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span>
<span class="k">def</span> <span class="nf">_bq_uuid</span><span class="p">(</span><span class="n">seed</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">seed</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s2">&quot;-&quot;</span><span class="p">,</span> <span class="s2">&quot;&quot;</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="n">hashlib</span><span class="o">.</span><span class="n">md5</span><span class="p">(</span><span class="n">seed</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">hexdigest</span><span class="p">())</span>
<span class="k">class</span> <span class="nc">_ShardDestinations</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Adds a shard number to the key of the KV element.</span>
<span class="sd"> Experimental; no backwards compatibility guarantees.&quot;&quot;&quot;</span>
<span class="n">DEFAULT_SHARDING_FACTOR</span> <span class="o">=</span> <span class="mi">10</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sharding_factor</span><span class="o">=</span><span class="n">DEFAULT_SHARDING_FACTOR</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sharding_factor</span> <span class="o">=</span> <span class="n">sharding_factor</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_shard_count</span> <span class="o">=</span> <span class="n">random</span><span class="o">.</span><span class="n">randrange</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sharding_factor</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">row</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="n">sharded_destination</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">destination</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_shard_count</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">sharding_factor</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_shard_count</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">sharded_destination</span><span class="p">,</span> <span class="n">row</span><span class="p">)</span>
<div class="viewcode-block" id="WriteRecordsToFile"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile">[docs]</a><span class="k">class</span> <span class="nc">WriteRecordsToFile</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Write input records to files before triggering a load job.</span>
<span class="sd"> This transform keeps up to ``max_files_per_bundle`` files open to write to. It</span>
<span class="sd"> receives (destination, record) tuples, and it writes the records to different</span>
<span class="sd"> files for each destination.</span>
<span class="sd"> If there are more than ``max_files_per_bundle`` destinations that we need to</span>
<span class="sd"> write to, then those records are grouped by their destination, and later</span>
<span class="sd"> written to files by ``WriteGroupedRecordsToFile``.</span>
<span class="sd"> It outputs two PCollections.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">UNWRITTEN_RECORD_TAG</span> <span class="o">=</span> <span class="s1">&#39;UnwrittenRecords&#39;</span>
<span class="n">WRITTEN_FILE_TAG</span> <span class="o">=</span> <span class="s1">&#39;WrittenFiles&#39;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">schema</span><span class="p">,</span>
<span class="n">max_files_per_bundle</span><span class="o">=</span><span class="n">_DEFAULT_MAX_WRITERS_PER_BUNDLE</span><span class="p">,</span>
<span class="n">max_file_size</span><span class="o">=</span><span class="n">_DEFAULT_MAX_FILE_SIZE</span><span class="p">,</span>
<span class="n">file_format</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Initialize a :class:`WriteRecordsToFile`.</span>
<span class="sd"> Args:</span>
<span class="sd"> max_files_per_bundle (int): The maximum number of files that can be kept</span>
<span class="sd"> open during execution of this step in a worker. This is to avoid over-</span>
<span class="sd"> whelming the worker memory.</span>
<span class="sd"> max_file_size (int): The maximum size in bytes for a file to be used in</span>
<span class="sd"> an export job.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span> <span class="o">=</span> <span class="n">max_files_per_bundle</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span> <span class="o">=</span> <span class="n">max_file_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_format</span> <span class="o">=</span> <span class="n">file_format</span> <span class="ow">or</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">FileFormat</span><span class="o">.</span><span class="n">JSON</span>
<div class="viewcode-block" id="WriteRecordsToFile.display_data"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span>
<span class="s1">&#39;max_files_per_bundle&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span><span class="p">,</span>
<span class="s1">&#39;max_file_size&#39;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">),</span>
<span class="s1">&#39;file_format&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_format</span><span class="p">,</span>
<span class="p">}</span></div>
<div class="viewcode-block" id="WriteRecordsToFile.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span> <span class="o">=</span> <span class="p">{}</span></div>
<div class="viewcode-block" id="WriteRecordsToFile.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">file_prefix</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Take a tuple with (destination, row) and write to file or spill out.</span>
<span class="sd"> Destination may be a ``TableReference`` or a string, and row is a</span>
<span class="sd"> Python dictionary for a row to be inserted to BigQuery.&quot;&quot;&quot;</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">get_hashable_destination</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="n">row</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="k">if</span> <span class="n">destination</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">)</span> <span class="o">&lt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">[</span><span class="n">destination</span><span class="p">]</span> <span class="o">=</span> <span class="n">_make_new_file_writer</span><span class="p">(</span>
<span class="n">file_prefix</span><span class="p">,</span>
<span class="n">destination</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_format</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span>
<span class="n">schema_side_inputs</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span>
<span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">UNWRITTEN_RECORD_TAG</span><span class="p">,</span> <span class="n">element</span><span class="p">)</span>
<span class="k">return</span>
<span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span><span class="p">)</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">[</span><span class="n">destination</span><span class="p">]</span>
<span class="c1"># TODO(pabloem): Is it possible for this to throw exception?</span>
<span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">row</span><span class="p">)</span>
<span class="n">file_size</span> <span class="o">=</span> <span class="n">writer</span><span class="o">.</span><span class="n">tell</span><span class="p">()</span>
<span class="k">if</span> <span class="n">file_size</span> <span class="o">&gt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">:</span>
<span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span>
<span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">,</span>
<span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">)))</span></div>
<div class="viewcode-block" id="WriteRecordsToFile.finish_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.finish_bundle">[docs]</a> <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">for</span> <span class="n">destination</span><span class="p">,</span> <span class="n">file_path_writer</span> <span class="ow">in</span> \
<span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span><span class="p">)</span> <span class="o">=</span> <span class="n">file_path_writer</span>
<span class="n">file_size</span> <span class="o">=</span> <span class="n">writer</span><span class="o">.</span><span class="n">tell</span><span class="p">()</span>
<span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span>
<span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">,</span>
<span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">((</span><span class="n">destination</span><span class="p">,</span> <span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">))))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span> <span class="o">=</span> <span class="p">{}</span></div></div>
<div class="viewcode-block" id="WriteGroupedRecordsToFile"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteGroupedRecordsToFile">[docs]</a><span class="k">class</span> <span class="nc">WriteGroupedRecordsToFile</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Receives collection of dest-iterable(records), writes it to files.</span>
<span class="sd"> This is different from ``WriteRecordsToFile`` because it receives records</span>
<span class="sd"> grouped by destination. This means that it&#39;s not necessary to keep multiple</span>
<span class="sd"> file descriptors open, because we know for sure when records for a single</span>
<span class="sd"> destination have been written out.</span>
<span class="sd"> Experimental; no backwards compatibility guarantees.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">schema</span><span class="p">,</span> <span class="n">max_file_size</span><span class="o">=</span><span class="n">_DEFAULT_MAX_FILE_SIZE</span><span class="p">,</span> <span class="n">file_format</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span> <span class="o">=</span> <span class="n">max_file_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_format</span> <span class="o">=</span> <span class="n">file_format</span> <span class="ow">or</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">FileFormat</span><span class="o">.</span><span class="n">JSON</span>
<div class="viewcode-block" id="WriteGroupedRecordsToFile.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteGroupedRecordsToFile.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">file_prefix</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">):</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">get_hashable_destination</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="n">rows</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span>
<span class="k">for</span> <span class="n">row</span> <span class="ow">in</span> <span class="n">rows</span><span class="p">:</span>
<span class="k">if</span> <span class="n">writer</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span><span class="p">)</span> <span class="o">=</span> <span class="n">_make_new_file_writer</span><span class="p">(</span>
<span class="n">file_prefix</span><span class="p">,</span>
<span class="n">destination</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_format</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span>
<span class="n">schema_side_inputs</span><span class="p">)</span>
<span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">row</span><span class="p">)</span>
<span class="n">file_size</span> <span class="o">=</span> <span class="n">writer</span><span class="o">.</span><span class="n">tell</span><span class="p">()</span>
<span class="k">if</span> <span class="n">file_size</span> <span class="o">&gt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">:</span>
<span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">))</span>
<span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">writer</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">))</span></div></div>
<div class="viewcode-block" id="UpdateDestinationSchema"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema">[docs]</a><span class="k">class</span> <span class="nc">UpdateDestinationSchema</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Update destination schema based on data that is about to be copied into it.</span>
<span class="sd"> Unlike load and query jobs, BigQuery copy jobs do not support schema field</span>
<span class="sd"> addition or relaxation on the destination table. This DoFn fills that gap by</span>
<span class="sd"> updating the destination table schemas to be compatible with the data coming</span>
<span class="sd"> from the source table so that schema field modification options are respected</span>
<span class="sd"> regardless of whether data is loaded directly to the destination table or</span>
<span class="sd"> loaded into temporary tables before being copied into the destination.</span>
<span class="sd"> This transform takes as input a (destination, job_reference) pair where the</span>
<span class="sd"> job_reference refers to a completed load job into a temporary table.</span>
<span class="sd"> This transform emits (destination, job_reference) pairs where the</span>
<span class="sd"> job_reference refers to a submitted load job for performing the schema</span>
<span class="sd"> modification in JSON format. Note that the input and output job references</span>
<span class="sd"> are not the same.</span>
<span class="sd"> Experimental; no backwards compatibility guarantees.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">project</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">additional_bq_parameters</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">step_name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">load_job_project_id</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="n">project</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_test_client</span> <span class="o">=</span> <span class="n">test_client</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span> <span class="o">=</span> <span class="n">additional_bq_parameters</span> <span class="ow">or</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span> <span class="o">=</span> <span class="n">step_name</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_load_job_project_id</span> <span class="o">=</span> <span class="n">load_job_project_id</span>
<div class="viewcode-block" id="UpdateDestinationSchema.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_test_client</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_bq_io_metadata</span> <span class="o">=</span> <span class="n">create_bigquery_io_metadata</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span> <span class="o">=</span> <span class="p">[]</span></div>
<div class="viewcode-block" id="UpdateDestinationSchema.display_data"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span>
<span class="s1">&#39;write_disposition&#39;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_write_disposition</span><span class="p">),</span>
<span class="s1">&#39;additional_bq_params&#39;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span><span class="p">),</span>
<span class="p">}</span></div>
<div class="viewcode-block" id="UpdateDestinationSchema.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">schema_mod_job_name_prefix</span><span class="p">):</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">temp_table_load_job_reference</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="k">if</span> <span class="nb">callable</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span><span class="p">):</span>
<span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span>
<span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span>
<span class="c1"># When writing to normal tables WRITE_TRUNCATE will overwrite the schema but</span>
<span class="c1"># when writing to a partition, care needs to be taken to update the schema</span>
<span class="c1"># even on WRITE_TRUNCATE.</span>
<span class="k">if</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_write_disposition</span> <span class="ow">not</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">&#39;WRITE_TRUNCATE&#39;</span><span class="p">,</span> <span class="s1">&#39;WRITE_APPEND&#39;</span><span class="p">)</span> <span class="ow">or</span>
<span class="ow">not</span> <span class="n">additional_parameters</span> <span class="ow">or</span>
<span class="ow">not</span> <span class="n">additional_parameters</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">&quot;schemaUpdateOptions&quot;</span><span class="p">)):</span>
<span class="c1"># No need to modify schema of destination table</span>
<span class="k">return</span>
<span class="n">table_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="k">if</span> <span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span>
<span class="s1">&#39;project&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">project</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># Check if destination table exists</span>
<span class="n">destination_table</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">get_table</span><span class="p">(</span>
<span class="n">project_id</span><span class="o">=</span><span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span>
<span class="n">dataset_id</span><span class="o">=</span><span class="n">table_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span>
<span class="n">table_id</span><span class="o">=</span><span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)</span>
<span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">exn</span><span class="p">:</span>
<span class="k">if</span> <span class="n">exn</span><span class="o">.</span><span class="n">status_code</span> <span class="o">==</span> <span class="mi">404</span><span class="p">:</span>
<span class="c1"># Destination table does not exist, so no need to modify its schema</span>
<span class="c1"># ahead of the copy jobs.</span>
<span class="k">return</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span>
<span class="n">temp_table_load_job</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">get_job</span><span class="p">(</span>
<span class="n">project</span><span class="o">=</span><span class="n">temp_table_load_job_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span>
<span class="n">job_id</span><span class="o">=</span><span class="n">temp_table_load_job_reference</span><span class="o">.</span><span class="n">jobId</span><span class="p">,</span>
<span class="n">location</span><span class="o">=</span><span class="n">temp_table_load_job_reference</span><span class="o">.</span><span class="n">location</span><span class="p">)</span>
<span class="n">temp_table_schema</span> <span class="o">=</span> <span class="n">temp_table_load_job</span><span class="o">.</span><span class="n">configuration</span><span class="o">.</span><span class="n">load</span><span class="o">.</span><span class="n">schema</span>
<span class="k">if</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">check_schema_equal</span><span class="p">(</span><span class="n">temp_table_schema</span><span class="p">,</span>
<span class="n">destination_table</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span>
<span class="n">ignore_descriptions</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">ignore_field_order</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
<span class="c1"># Destination table schema is already the same as the temp table schema,</span>
<span class="c1"># so no need to run a job to update the destination table schema.</span>
<span class="k">return</span>
<span class="n">destination_hash</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">(</span>
<span class="s1">&#39;</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">.</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">))</span>
<span class="n">uid</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">()</span>
<span class="n">job_name</span> <span class="o">=</span> <span class="s1">&#39;</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">schema_mod_job_name_prefix</span><span class="p">,</span> <span class="n">destination_hash</span><span class="p">,</span> <span class="n">uid</span><span class="p">)</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;Triggering schema modification job </span><span class="si">%s</span><span class="s1"> on </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">job_name</span><span class="p">,</span>
<span class="n">table_reference</span><span class="p">)</span>
<span class="c1"># Trigger potential schema modification by loading zero rows into the</span>
<span class="c1"># destination table with the temporary table schema.</span>
<span class="n">schema_update_job_reference</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">perform_load_job</span><span class="p">(</span>
<span class="n">destination</span><span class="o">=</span><span class="n">table_reference</span><span class="p">,</span>
<span class="n">source_stream</span><span class="o">=</span><span class="n">io</span><span class="o">.</span><span class="n">BytesIO</span><span class="p">(),</span> <span class="c1"># file with zero rows</span>
<span class="n">job_id</span><span class="o">=</span><span class="n">job_name</span><span class="p">,</span>
<span class="n">schema</span><span class="o">=</span><span class="n">temp_table_schema</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="s1">&#39;WRITE_APPEND&#39;</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="s1">&#39;CREATE_NEVER&#39;</span><span class="p">,</span>
<span class="n">additional_load_parameters</span><span class="o">=</span><span class="n">additional_parameters</span><span class="p">,</span>
<span class="n">job_labels</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_bq_io_metadata</span><span class="o">.</span><span class="n">add_additional_bq_job_labels</span><span class="p">(),</span>
<span class="c1"># JSON format is hardcoded because zero rows load(unlike AVRO) and</span>
<span class="c1"># a nested schema(unlike CSV, which a default one) is permitted.</span>
<span class="n">source_format</span><span class="o">=</span><span class="s2">&quot;NEWLINE_DELIMITED_JSON&quot;</span><span class="p">,</span>
<span class="n">load_job_project_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_load_job_project_id</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="o">.</span><span class="n">append</span><span class="p">(</span>
<span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span>
<span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">schema_update_job_reference</span><span class="p">)))</span></div>
<div class="viewcode-block" id="UpdateDestinationSchema.finish_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema.finish_bundle">[docs]</a> <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># Unlike the other steps, schema update is not always necessary.</span>
<span class="c1"># In that case, return a None value to avoid blocking in streaming context.</span>
<span class="c1"># Otherwise, the streaming pipeline would get stuck waiting for the</span>
<span class="c1"># TriggerCopyJobs side-input.</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="p">:</span>
<span class="k">return</span> <span class="p">[</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="kc">None</span><span class="p">)]</span>
<span class="k">for</span> <span class="n">windowed_value</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="p">:</span>
<span class="n">job_ref</span> <span class="o">=</span> <span class="n">windowed_value</span><span class="o">.</span><span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">wait_for_bq_job</span><span class="p">(</span>
<span class="n">job_ref</span><span class="p">,</span> <span class="n">sleep_duration_sec</span><span class="o">=</span><span class="n">_SLEEP_DURATION_BETWEEN_POLLS</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span></div></div>
<div class="viewcode-block" id="TriggerCopyJobs"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs">[docs]</a><span class="k">class</span> <span class="nc">TriggerCopyJobs</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Launches jobs to copy from temporary tables into the main target table.</span>
<span class="sd"> When a job needs to write to multiple destination tables, or when a single</span>
<span class="sd"> destination table needs to have multiple load jobs to write to it, files are</span>
<span class="sd"> loaded into temporary tables, and those tables are later copied to the</span>
<span class="sd"> destination tables.</span>
<span class="sd"> This transform emits (destination, job_reference) pairs.</span>
<span class="sd"> TODO(BEAM-7822): In file loads method of writing to BigQuery,</span>
<span class="sd"> copying from temp_tables to destination_table is not atomic.</span>
<span class="sd"> See: https://issues.apache.org/jira/browse/BEAM-7822</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">TRIGGER_DELETE_TEMP_TABLES</span> <span class="o">=</span> <span class="s1">&#39;TriggerDeleteTempTables&#39;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">project</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">step_name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">load_job_project_id</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="n">project</span>
<span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="n">create_disposition</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_observed_tables</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span> <span class="o">=</span> <span class="n">step_name</span>
<span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span> <span class="o">=</span> <span class="n">load_job_project_id</span>
<div class="viewcode-block" id="TriggerCopyJobs.display_data"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span>
<span class="s1">&#39;launchesBigQueryJobs&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span>
<span class="kc">True</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s2">&quot;This Dataflow job launches bigquery jobs.&quot;</span><span class="p">)</span>
<span class="p">}</span></div>
<div class="viewcode-block" id="TriggerCopyJobs.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_observed_tables</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span> <span class="o">=</span> <span class="n">create_bigquery_io_metadata</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span> <span class="o">=</span> <span class="p">[]</span></div>
<div class="viewcode-block" id="TriggerCopyJobs.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">element_list</span><span class="p">,</span> <span class="n">job_name_prefix</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">unused_schema_mod_jobs</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element_list</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">):</span>
<span class="c1"># Allow this for streaming update compatibility while fixing BEAM-24535.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">process_one</span><span class="p">(</span><span class="n">element_list</span><span class="p">,</span> <span class="n">job_name_prefix</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">for</span> <span class="n">element</span> <span class="ow">in</span> <span class="n">element_list</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">process_one</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">job_name_prefix</span><span class="p">)</span></div>
<div class="viewcode-block" id="TriggerCopyJobs.process_one"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.process_one">[docs]</a> <span class="k">def</span> <span class="nf">process_one</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">job_name_prefix</span><span class="p">):</span>
<span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span> <span class="o">=</span> <span class="n">element</span>
<span class="n">copy_to_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="k">if</span> <span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span>
<span class="s1">&#39;project&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">project</span>
<span class="n">copy_from_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="n">copy_from_reference</span><span class="o">.</span><span class="n">tableId</span> <span class="o">=</span> <span class="n">job_reference</span><span class="o">.</span><span class="n">jobId</span>
<span class="k">if</span> <span class="n">copy_from_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">copy_from_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span>
<span class="s1">&#39;project&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">project</span>
<span class="n">copy_job_name</span> <span class="o">=</span> <span class="s1">&#39;</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span>
<span class="n">job_name_prefix</span><span class="p">,</span>
<span class="n">_bq_uuid</span><span class="p">(</span>
<span class="s1">&#39;</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">.</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span>
<span class="n">copy_from_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span>
<span class="n">copy_from_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span>
<span class="n">copy_from_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)))</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s2">&quot;Triggering copy job from </span><span class="si">%s</span><span class="s2"> to </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span>
<span class="n">copy_from_reference</span><span class="p">,</span>
<span class="n">copy_to_reference</span><span class="p">)</span>
<span class="k">if</span> <span class="n">copy_to_reference</span><span class="o">.</span><span class="n">tableId</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_observed_tables</span><span class="p">:</span>
<span class="c1"># When the write_disposition for a job is WRITE_TRUNCATE,</span>
<span class="c1"># multiple copy jobs to the same destination can stump on</span>
<span class="c1"># each other, truncate data, and write to the BQ table over and</span>
<span class="c1"># over.</span>
<span class="c1"># Thus, the first copy job runs with the user&#39;s write_disposition,</span>
<span class="c1"># but afterwards, all jobs must always WRITE_APPEND to the table.</span>
<span class="c1"># If they do not, subsequent copy jobs will clear out data appended</span>
<span class="c1"># by previous jobs.</span>
<span class="n">write_disposition</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span>
<span class="n">wait_for_job</span> <span class="o">=</span> <span class="kc">True</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_observed_tables</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">copy_to_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">wait_for_job</span> <span class="o">=</span> <span class="kc">False</span>
<span class="n">write_disposition</span> <span class="o">=</span> <span class="s1">&#39;WRITE_APPEND&#39;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span> <span class="o">=</span> <span class="n">create_bigquery_io_metadata</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span><span class="p">)</span>
<span class="n">project_id</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span> <span class="ow">is</span> <span class="kc">None</span> <span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span><span class="p">)</span>
<span class="n">job_reference</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">_insert_copy_job</span><span class="p">(</span>
<span class="n">project_id</span><span class="p">,</span>
<span class="n">copy_job_name</span><span class="p">,</span>
<span class="n">copy_from_reference</span><span class="p">,</span>
<span class="n">copy_to_reference</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="n">write_disposition</span><span class="p">,</span>
<span class="n">job_labels</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span><span class="o">.</span><span class="n">add_additional_bq_job_labels</span><span class="p">())</span>
<span class="k">if</span> <span class="n">wait_for_job</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">wait_for_bq_job</span><span class="p">(</span><span class="n">job_reference</span><span class="p">,</span> <span class="n">sleep_duration_sec</span><span class="o">=</span><span class="mi">10</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="o">.</span><span class="n">append</span><span class="p">(</span>
<span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">((</span><span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span><span class="p">)))</span></div>
<div class="viewcode-block" id="TriggerCopyJobs.finish_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.finish_bundle">[docs]</a> <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">for</span> <span class="n">windowed_value</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="p">:</span>
<span class="n">job_ref</span> <span class="o">=</span> <span class="n">windowed_value</span><span class="o">.</span><span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">wait_for_bq_job</span><span class="p">(</span>
<span class="n">job_ref</span><span class="p">,</span> <span class="n">sleep_duration_sec</span><span class="o">=</span><span class="n">_SLEEP_DURATION_BETWEEN_POLLS</span><span class="p">)</span>
<span class="k">yield</span> <span class="n">windowed_value</span>
<span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span>
<span class="n">TriggerCopyJobs</span><span class="o">.</span><span class="n">TRIGGER_DELETE_TEMP_TABLES</span><span class="p">,</span>
<span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="kc">None</span><span class="p">))</span></div></div>
<div class="viewcode-block" id="TriggerLoadJobs"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs">[docs]</a><span class="k">class</span> <span class="nc">TriggerLoadJobs</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Triggers the import jobs to BQ.</span>
<span class="sd"> Experimental; no backwards compatibility guarantees.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">TEMP_TABLES</span> <span class="o">=</span> <span class="s1">&#39;TemporaryTables&#39;</span>
<span class="n">ONGOING_JOBS</span> <span class="o">=</span> <span class="s1">&#39;OngoingJobs&#39;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">schema</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">project</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">temporary_tables</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">additional_bq_parameters</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">source_format</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">step_name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">load_job_project_id</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span>
<span class="bp">self</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="n">project</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span>
<span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span> <span class="o">=</span> <span class="n">temporary_tables</span>
<span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span> <span class="o">=</span> <span class="n">additional_bq_parameters</span> <span class="ow">or</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">source_format</span> <span class="o">=</span> <span class="n">source_format</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span> <span class="o">=</span> <span class="n">step_name</span>
<span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span> <span class="o">=</span> <span class="n">load_job_project_id</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span><span class="p">:</span>
<span class="c1"># If we are loading into temporary tables, we rely on the default create</span>
<span class="c1"># and write dispositions, which mean that a new table will be created.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="n">create_disposition</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span>
<div class="viewcode-block" id="TriggerLoadJobs.display_data"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">result</span> <span class="o">=</span> <span class="p">{</span>
<span class="s1">&#39;create_disposition&#39;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">),</span>
<span class="s1">&#39;write_disposition&#39;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">),</span>
<span class="s1">&#39;additional_bq_params&#39;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">),</span>
<span class="s1">&#39;schema&#39;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">),</span>
<span class="s1">&#39;launchesBigQueryJobs&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span>
<span class="kc">True</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s2">&quot;This Dataflow job launches bigquery jobs.&quot;</span><span class="p">),</span>
<span class="s1">&#39;source_format&#39;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source_format</span><span class="p">),</span>
<span class="p">}</span>
<span class="k">return</span> <span class="n">result</span></div>
<div class="viewcode-block" id="TriggerLoadJobs.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span> <span class="o">=</span> <span class="n">create_bigquery_io_metadata</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span> <span class="o">=</span> <span class="p">[]</span></div>
<div class="viewcode-block" id="TriggerLoadJobs.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">load_job_name_prefix</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">):</span>
<span class="c1"># Each load job is assumed to have files respecting these constraints:</span>
<span class="c1"># 1. Total size of all files &lt; 15 TB (Max size for load jobs)</span>
<span class="c1"># 2. Total no. of files in a single load job &lt; 10,000</span>
<span class="c1"># This assumption means that there will always be a single load job</span>
<span class="c1"># triggered for each partition of files.</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">files</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="k">if</span> <span class="nb">callable</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">):</span>
<span class="n">schema</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span>
<span class="n">schema</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">schema</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span>
<span class="k">if</span> <span class="nb">callable</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">):</span>
<span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span>
<span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span>
<span class="n">table_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="k">if</span> <span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span>
<span class="s1">&#39;project&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">project</span>
<span class="c1"># Load jobs for a single destination are always triggered from the same</span>
<span class="c1"># worker. This means that we can generate a deterministic numbered job id,</span>
<span class="c1"># and not need to worry.</span>
<span class="n">destination_hash</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">(</span>
<span class="s1">&#39;</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">.</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">))</span>
<span class="n">uid</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">()</span>
<span class="n">job_name</span> <span class="o">=</span> <span class="s1">&#39;</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">load_job_name_prefix</span><span class="p">,</span> <span class="n">destination_hash</span><span class="p">,</span> <span class="n">uid</span><span class="p">)</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Load job has </span><span class="si">%s</span><span class="s1"> files. Job name is </span><span class="si">%s</span><span class="s1">.&#39;</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">files</span><span class="p">),</span> <span class="n">job_name</span><span class="p">)</span>
<span class="n">create_disposition</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span><span class="p">:</span>
<span class="c1"># If we are using temporary tables, then we must always create the</span>
<span class="c1"># temporary tables, so we replace the create_disposition.</span>
<span class="n">create_disposition</span> <span class="o">=</span> <span class="s1">&#39;CREATE_IF_NEEDED&#39;</span>
<span class="c1"># For temporary tables, we create a new table with the name with JobId.</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span> <span class="o">=</span> <span class="n">job_name</span>
<span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span>
<span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">TEMP_TABLES</span><span class="p">,</span>
<span class="n">bigquery_tools</span><span class="o">.</span><span class="n">get_hashable_destination</span><span class="p">(</span><span class="n">table_reference</span><span class="p">))</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;Triggering job </span><span class="si">%s</span><span class="s1"> to load data to BigQuery table </span><span class="si">%s</span><span class="s1">.&#39;</span>
<span class="s1">&#39;Schema: </span><span class="si">%s</span><span class="s1">. Additional parameters: </span><span class="si">%s</span><span class="s1">. Source format: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">job_name</span><span class="p">,</span>
<span class="n">table_reference</span><span class="p">,</span>
<span class="n">schema</span><span class="p">,</span>
<span class="n">additional_parameters</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">source_format</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span> <span class="o">=</span> <span class="n">create_bigquery_io_metadata</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span><span class="p">)</span>
<span class="n">job_reference</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">perform_load_job</span><span class="p">(</span>
<span class="n">destination</span><span class="o">=</span><span class="n">table_reference</span><span class="p">,</span>
<span class="n">source_uris</span><span class="o">=</span><span class="n">files</span><span class="p">,</span>
<span class="n">job_id</span><span class="o">=</span><span class="n">job_name</span><span class="p">,</span>
<span class="n">schema</span><span class="o">=</span><span class="n">schema</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="n">create_disposition</span><span class="p">,</span>
<span class="n">additional_load_parameters</span><span class="o">=</span><span class="n">additional_parameters</span><span class="p">,</span>
<span class="n">source_format</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source_format</span><span class="p">,</span>
<span class="n">job_labels</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span><span class="o">.</span><span class="n">add_additional_bq_job_labels</span><span class="p">(),</span>
<span class="n">load_job_project_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span><span class="p">)</span>
<span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span>
<span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">ONGOING_JOBS</span><span class="p">,</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="o">.</span><span class="n">append</span><span class="p">(</span>
<span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">((</span><span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span><span class="p">)))</span></div>
<div class="viewcode-block" id="TriggerLoadJobs.finish_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.finish_bundle">[docs]</a> <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">for</span> <span class="n">windowed_value</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="p">:</span>
<span class="n">job_ref</span> <span class="o">=</span> <span class="n">windowed_value</span><span class="o">.</span><span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">wait_for_bq_job</span><span class="p">(</span>
<span class="n">job_ref</span><span class="p">,</span> <span class="n">sleep_duration_sec</span><span class="o">=</span><span class="n">_SLEEP_DURATION_BETWEEN_POLLS</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span></div></div>
<div class="viewcode-block" id="PartitionFiles"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.PartitionFiles">[docs]</a><span class="k">class</span> <span class="nc">PartitionFiles</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="n">MULTIPLE_PARTITIONS_TAG</span> <span class="o">=</span> <span class="s1">&#39;MULTIPLE_PARTITIONS&#39;</span>
<span class="n">SINGLE_PARTITION_TAG</span> <span class="o">=</span> <span class="s1">&#39;SINGLE_PARTITION&#39;</span>
<div class="viewcode-block" id="PartitionFiles.Partition"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.PartitionFiles.Partition">[docs]</a> <span class="k">class</span> <span class="nc">Partition</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">max_size</span><span class="p">,</span> <span class="n">max_files</span><span class="p">,</span> <span class="n">files</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">size</span><span class="o">=</span><span class="mi">0</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_size</span> <span class="o">=</span> <span class="n">max_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_files</span> <span class="o">=</span> <span class="n">max_files</span>
<span class="bp">self</span><span class="o">.</span><span class="n">files</span> <span class="o">=</span> <span class="n">files</span> <span class="k">if</span> <span class="n">files</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="k">else</span> <span class="p">[]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">size</span> <span class="o">=</span> <span class="n">size</span>
<div class="viewcode-block" id="PartitionFiles.Partition.can_accept"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.PartitionFiles.Partition.can_accept">[docs]</a> <span class="k">def</span> <span class="nf">can_accept</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">file_size</span><span class="p">,</span> <span class="n">no_of_files</span><span class="o">=</span><span class="mi">1</span><span class="p">):</span>
<span class="k">if</span> <span class="p">(((</span><span class="bp">self</span><span class="o">.</span><span class="n">size</span> <span class="o">+</span> <span class="n">file_size</span><span class="p">)</span> <span class="o">&lt;=</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_size</span><span class="p">)</span> <span class="ow">and</span>
<span class="p">((</span><span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">files</span><span class="p">)</span> <span class="o">+</span> <span class="n">no_of_files</span><span class="p">)</span> <span class="o">&lt;=</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files</span><span class="p">)):</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span></div>
<div class="viewcode-block" id="PartitionFiles.Partition.add"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.PartitionFiles.Partition.add">[docs]</a> <span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">files</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">file_path</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">size</span> <span class="o">+=</span> <span class="n">file_size</span></div></div>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">max_partition_size</span><span class="p">,</span> <span class="n">max_files_per_partition</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_partition_size</span> <span class="o">=</span> <span class="n">max_partition_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_partition</span> <span class="o">=</span> <span class="n">max_files_per_partition</span>
<div class="viewcode-block" id="PartitionFiles.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.PartitionFiles.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">files</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="n">partitions</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">files</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;Ignoring a BigQuery batch load partition to </span><span class="si">%s</span><span class="s1"> &#39;</span>
<span class="s1">&#39;that contains no source URIs.&#39;</span><span class="p">,</span>
<span class="n">destination</span><span class="p">)</span>
<span class="k">return</span>
<span class="n">latest_partition</span> <span class="o">=</span> <span class="n">PartitionFiles</span><span class="o">.</span><span class="n">Partition</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_partition_size</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_partition</span><span class="p">)</span>
<span class="k">for</span> <span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span> <span class="ow">in</span> <span class="n">files</span><span class="p">:</span>
<span class="k">if</span> <span class="n">latest_partition</span><span class="o">.</span><span class="n">can_accept</span><span class="p">(</span><span class="n">file_size</span><span class="p">):</span>
<span class="n">latest_partition</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">partitions</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">latest_partition</span><span class="o">.</span><span class="n">files</span><span class="p">)</span>
<span class="n">latest_partition</span> <span class="o">=</span> <span class="n">PartitionFiles</span><span class="o">.</span><span class="n">Partition</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_partition_size</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_partition</span><span class="p">)</span>
<span class="n">latest_partition</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">)</span>
<span class="n">partitions</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">latest_partition</span><span class="o">.</span><span class="n">files</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">partitions</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">1</span><span class="p">:</span>
<span class="n">output_tag</span> <span class="o">=</span> <span class="n">PartitionFiles</span><span class="o">.</span><span class="n">MULTIPLE_PARTITIONS_TAG</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">output_tag</span> <span class="o">=</span> <span class="n">PartitionFiles</span><span class="o">.</span><span class="n">SINGLE_PARTITION_TAG</span>
<span class="k">for</span> <span class="n">partition</span> <span class="ow">in</span> <span class="n">partitions</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span><span class="n">output_tag</span><span class="p">,</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">partition</span><span class="p">))</span></div></div>
<div class="viewcode-block" id="DeleteTablesFn"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn">[docs]</a><span class="k">class</span> <span class="nc">DeleteTablesFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span>
<div class="viewcode-block" id="DeleteTablesFn.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span></div>
<div class="viewcode-block" id="DeleteTablesFn.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">table_reference</span><span class="p">):</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Deleting table </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">table_reference</span><span class="p">)</span>
<span class="n">table_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">table_reference</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">_delete_table</span><span class="p">(</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)</span></div></div>
<div class="viewcode-block" id="BigQueryBatchFileLoads"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads">[docs]</a><span class="k">class</span> <span class="nc">BigQueryBatchFileLoads</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Takes in a set of elements, and inserts them to BigQuery via batch loads.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">DESTINATION_JOBID_PAIRS</span> <span class="o">=</span> <span class="s1">&#39;destination_load_jobid_pairs&#39;</span>
<span class="n">DESTINATION_FILE_PAIRS</span> <span class="o">=</span> <span class="s1">&#39;destination_file_pairs&#39;</span>
<span class="n">DESTINATION_COPY_JOBID_PAIRS</span> <span class="o">=</span> <span class="s1">&#39;destination_copy_jobid_pairs&#39;</span>
<span class="n">COUNT</span> <span class="o">=</span> <span class="mi">0</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">destination</span><span class="p">,</span>
<span class="n">project</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">schema</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">custom_gcs_temp_location</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">triggering_frequency</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">with_auto_sharding</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">temp_file_format</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">max_file_size</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">max_files_per_bundle</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">max_partition_size</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">max_files_per_partition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">additional_bq_parameters</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">table_side_inputs</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">schema_side_inputs</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">validate</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">is_streaming_pipeline</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">load_job_project_id</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">destination</span> <span class="o">=</span> <span class="n">destination</span>
<span class="bp">self</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="n">project</span>
<span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="n">create_disposition</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span>
<span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span> <span class="o">=</span> <span class="n">triggering_frequency</span>
<span class="bp">self</span><span class="o">.</span><span class="n">with_auto_sharding</span> <span class="o">=</span> <span class="n">with_auto_sharding</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span> <span class="o">=</span> <span class="n">max_file_size</span> <span class="ow">or</span> <span class="n">_DEFAULT_MAX_FILE_SIZE</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">max_files_per_bundle</span> <span class="ow">or</span> <span class="n">_DEFAULT_MAX_WRITERS_PER_BUNDLE</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_partition_size</span> <span class="o">=</span> <span class="n">max_partition_size</span> <span class="ow">or</span> <span class="n">_MAXIMUM_LOAD_SIZE</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_partition</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">max_files_per_partition</span> <span class="ow">or</span> <span class="n">_MAXIMUM_SOURCE_URIS</span><span class="p">)</span>
<span class="k">if</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">custom_gcs_temp_location</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span> <span class="ow">or</span>
<span class="n">custom_gcs_temp_location</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">StaticValueProvider</span><span class="p">(</span>
<span class="nb">str</span><span class="p">,</span> <span class="n">custom_gcs_temp_location</span> <span class="ow">or</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">custom_gcs_temp_location</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span> <span class="o">=</span> <span class="n">custom_gcs_temp_location</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;custom_gcs_temp_location must be str or ValueProvider&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span>
<span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_temp_file_format</span> <span class="o">=</span> <span class="n">temp_file_format</span> <span class="ow">or</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">FileFormat</span><span class="o">.</span><span class="n">JSON</span>
<span class="c1"># If we have multiple destinations, then we will have multiple load jobs,</span>
<span class="c1"># thus we will need temporary tables for atomicity.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dynamic_destinations</span> <span class="o">=</span> <span class="nb">bool</span><span class="p">(</span><span class="nb">callable</span><span class="p">(</span><span class="n">destination</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span> <span class="o">=</span> <span class="n">additional_bq_parameters</span> <span class="ow">or</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">table_side_inputs</span> <span class="o">=</span> <span class="n">table_side_inputs</span> <span class="ow">or</span> <span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span> <span class="o">=</span> <span class="n">schema_side_inputs</span> <span class="ow">or</span> <span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="o">=</span> <span class="n">is_streaming_pipeline</span>
<span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span> <span class="o">=</span> <span class="n">load_job_project_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_validate</span> <span class="o">=</span> <span class="n">validate</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_validate</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">verify</span><span class="p">()</span>
<div class="viewcode-block" id="BigQueryBatchFileLoads.verify"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads.verify">[docs]</a> <span class="k">def</span> <span class="nf">verify</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="o">.</span><span class="n">get</span><span class="p">(),</span> <span class="n">vp</span><span class="o">.</span><span class="n">StaticValueProvider</span><span class="p">)</span>
<span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="o">.</span><span class="n">get</span><span class="p">()</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;gs://&#39;</span><span class="p">)):</span>
<span class="c1"># Only fail if the custom location is provided, and it is not a GCS</span>
<span class="c1"># location.</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;Invalid GCS location: </span><span class="si">%r</span><span class="s1">.</span><span class="se">\n</span><span class="s1">&#39;</span>
<span class="s1">&#39;Writing to BigQuery with FILE_LOADS method requires a &#39;</span>
<span class="s1">&#39;GCS location to be provided to write files to be &#39;</span>
<span class="s1">&#39;loaded into BigQuery. Please provide a GCS bucket, or &#39;</span>
<span class="s1">&#39;pass method=&quot;STREAMING_INSERTS&quot; to WriteToBigQuery.&#39;</span> <span class="o">%</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="o">.</span><span class="n">get</span><span class="p">())</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;triggering_frequency must be specified to use file&#39;</span>
<span class="s1">&#39;loads in streaming&#39;</span><span class="p">)</span>
<span class="k">elif</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;triggering_frequency can only be used with file&#39;</span>
<span class="s1">&#39;loads in streaming&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">with_auto_sharding</span><span class="p">:</span>
<span class="k">return</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;with_auto_sharding can only be used with file loads in streaming.&#39;</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">_window_fn</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Set the correct WindowInto PTransform&quot;&quot;&quot;</span>
<span class="c1"># The user-supplied triggering_frequency is often chosen to control how</span>
<span class="c1"># many BigQuery load jobs are triggered, to prevent going over BigQuery&#39;s</span>
<span class="c1"># daily quota for load jobs. If this is set to a large value, currently we</span>
<span class="c1"># have to buffer all the data until the trigger fires. Instead we ensure</span>
<span class="c1"># that the files are written if a threshold number of records are ready.</span>
<span class="c1"># We use only the user-supplied trigger on the actual BigQuery load.</span>
<span class="c1"># This allows us to offload the data to the filesystem.</span>
<span class="c1">#</span>
<span class="c1"># In the case of dynamic sharding, however, we use a default trigger since</span>
<span class="c1"># the transform performs sharding also batches elements to avoid generating</span>
<span class="c1"># too many tiny files. User trigger is applied right after writes to limit</span>
<span class="c1"># the number of load jobs.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">with_auto_sharding</span><span class="p">:</span>
<span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">(),</span>
<span class="n">trigger</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">Repeatedly</span><span class="p">(</span>
<span class="n">trigger</span><span class="o">.</span><span class="n">AfterAny</span><span class="p">(</span>
<span class="n">trigger</span><span class="o">.</span><span class="n">AfterProcessingTime</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">),</span>
<span class="n">trigger</span><span class="o">.</span><span class="n">AfterCount</span><span class="p">(</span>
<span class="n">_FILE_TRIGGERING_RECORD_COUNT</span><span class="p">))),</span>
<span class="n">accumulation_mode</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">AccumulationMode</span>\
<span class="o">.</span><span class="n">DISCARDING</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span>
<span class="k">def</span> <span class="nf">_maybe_apply_user_trigger</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">destination_file_kv_pc</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span><span class="p">:</span>
<span class="c1"># Apply the user&#39;s trigger back before we start triggering load jobs</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">destination_file_kv_pc</span>
<span class="o">|</span> <span class="s2">&quot;ApplyUserTrigger&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span>
<span class="n">beam</span><span class="o">.</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">(),</span>
<span class="n">trigger</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">Repeatedly</span><span class="p">(</span>
<span class="n">trigger</span><span class="o">.</span><span class="n">AfterAll</span><span class="p">(</span>
<span class="n">trigger</span><span class="o">.</span><span class="n">AfterProcessingTime</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">),</span>
<span class="n">trigger</span><span class="o">.</span><span class="n">AfterCount</span><span class="p">(</span><span class="mi">1</span><span class="p">))),</span>
<span class="n">accumulation_mode</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">AccumulationMode</span><span class="o">.</span><span class="n">DISCARDING</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">destination_file_kv_pc</span>
<span class="k">def</span> <span class="nf">_write_files</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">destination_data_kv_pc</span><span class="p">,</span> <span class="n">file_prefix_pcv</span><span class="p">):</span>
<span class="n">outputs</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">destination_data_kv_pc</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">WriteRecordsToFile</span><span class="p">(</span>
<span class="n">schema</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span>
<span class="n">max_files_per_bundle</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span><span class="p">,</span>
<span class="n">max_file_size</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">,</span>
<span class="n">file_format</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_temp_file_format</span><span class="p">),</span>
<span class="n">file_prefix_pcv</span><span class="p">,</span>
<span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span><span class="p">)</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span>
<span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">UNWRITTEN_RECORD_TAG</span><span class="p">,</span>
<span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">))</span>
<span class="c1"># A PCollection of (destination, file) tuples. It lists files with records,</span>
<span class="c1"># and the destination each file is meant to be imported into.</span>
<span class="n">destination_files_kv_pc</span> <span class="o">=</span> <span class="n">outputs</span><span class="p">[</span><span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">]</span>
<span class="c1"># A PCollection of (destination, record) tuples. These are later sharded,</span>
<span class="c1"># grouped, and all records for each destination-shard is written to files.</span>
<span class="c1"># This PCollection is necessary because not all records can be written into</span>
<span class="c1"># files in ``WriteRecordsToFile``.</span>
<span class="n">unwritten_records_pc</span> <span class="o">=</span> <span class="n">outputs</span><span class="p">[</span><span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">UNWRITTEN_RECORD_TAG</span><span class="p">]</span>
<span class="n">more_destination_files_kv_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">unwritten_records_pc</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">_ShardDestinations</span><span class="p">())</span>
<span class="o">|</span> <span class="s2">&quot;GroupShardedRows&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span>
<span class="o">|</span> <span class="s2">&quot;DropShardNumber&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="n">x</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">],</span> <span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">]))</span>
<span class="o">|</span> <span class="s2">&quot;WriteGroupedRecordsToFile&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">WriteGroupedRecordsToFile</span><span class="p">(</span>
<span class="n">schema</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="n">file_format</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_temp_file_format</span><span class="p">),</span>
<span class="n">file_prefix_pcv</span><span class="p">,</span>
<span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span><span class="p">))</span>
<span class="c1"># TODO(https://github.com/apache/beam/issues/20285): Remove the identity</span>
<span class="c1"># transform. We flatten both PCollection paths and use an identity function</span>
<span class="c1"># to work around a flatten optimization issue where the wrong coder is</span>
<span class="c1"># being used.</span>
<span class="n">all_destination_file_pairs_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="p">(</span><span class="n">destination_files_kv_pc</span><span class="p">,</span> <span class="n">more_destination_files_kv_pc</span><span class="p">)</span>
<span class="o">|</span> <span class="s2">&quot;DestinationFilesUnion&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">()</span>
<span class="o">|</span> <span class="s2">&quot;IdentityWorkaround&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">))</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_maybe_apply_user_trigger</span><span class="p">(</span><span class="n">all_destination_file_pairs_pc</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_write_files_with_auto_sharding</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">destination_data_kv_pc</span><span class="p">,</span> <span class="n">file_prefix_pcv</span><span class="p">):</span>
<span class="n">clock</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="o">.</span><span class="n">test_clock</span> <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="k">else</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span>
<span class="c1"># Auto-sharding is achieved via GroupIntoBatches.WithShardedKey</span>
<span class="c1"># transform which shards, groups and at the same time batches the table rows</span>
<span class="c1"># to be inserted to BigQuery.</span>
<span class="c1"># Firstly, the keys of tagged_data (table references) are converted to a</span>
<span class="c1"># hashable format. This is needed to work with the keyed states used by.</span>
<span class="c1"># GroupIntoBatches. After grouping and batching is done, table references</span>
<span class="c1"># are restored.</span>
<span class="n">destination_files_kv_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">destination_data_kv_pc</span>
<span class="o">|</span>
<span class="s1">&#39;ToHashableTableRef&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="n">bigquery_tools</span><span class="o">.</span><span class="n">to_hashable_table_ref</span><span class="p">)</span>
<span class="o">|</span> <span class="s1">&#39;WithAutoSharding&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">GroupIntoBatches</span><span class="o">.</span><span class="n">WithShardedKey</span><span class="p">(</span>
<span class="n">batch_size</span><span class="o">=</span><span class="n">_FILE_TRIGGERING_RECORD_COUNT</span><span class="p">,</span>
<span class="n">max_buffering_duration_secs</span><span class="o">=</span><span class="n">_FILE_TRIGGERING_BATCHING_DURATION_SECS</span><span class="p">,</span>
<span class="n">clock</span><span class="o">=</span><span class="n">clock</span><span class="p">)</span>
<span class="o">|</span> <span class="s1">&#39;FromHashableTableRefAndDropShard&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">kvs</span><span class="p">:</span>
<span class="p">(</span><span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">kvs</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">key</span><span class="p">),</span> <span class="n">kvs</span><span class="p">[</span><span class="mi">1</span><span class="p">]))</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">WriteGroupedRecordsToFile</span><span class="p">(</span>
<span class="n">schema</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="n">file_format</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_temp_file_format</span><span class="p">),</span>
<span class="n">file_prefix_pcv</span><span class="p">,</span>
<span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span><span class="p">))</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_maybe_apply_user_trigger</span><span class="p">(</span><span class="n">destination_files_kv_pc</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_load_data</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">partitions_using_temp_tables</span><span class="p">,</span>
<span class="n">partitions_direct_to_destination</span><span class="p">,</span>
<span class="n">load_job_name_pcv</span><span class="p">,</span>
<span class="n">schema_mod_job_name_pcv</span><span class="p">,</span>
<span class="n">copy_job_name_pcv</span><span class="p">,</span>
<span class="n">p</span><span class="p">,</span>
<span class="n">step_name</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Load data to BigQuery</span>
<span class="sd"> Data is loaded into BigQuery in the following two ways:</span>
<span class="sd"> 1. Single partition:</span>
<span class="sd"> When there is a single partition of files destined to a single</span>
<span class="sd"> destination, a single load job is triggered.</span>
<span class="sd"> 2. Multiple partitions and/or Dynamic Destinations:</span>
<span class="sd"> When there are multiple partitions of files destined for a single</span>
<span class="sd"> destination or when Dynamic Destinations are used, multiple load jobs</span>
<span class="sd"> need to be triggered for each partition/destination. Load Jobs are</span>
<span class="sd"> triggered to temporary tables, and those are later copied to the actual</span>
<span class="sd"> appropriate destination table. This ensures atomicity when only some</span>
<span class="sd"> of the load jobs would fail but not other. If any of them fails, then</span>
<span class="sd"> copy jobs are not triggered.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Load data using temp tables</span>
<span class="n">trigger_loads_outputs</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">partitions_using_temp_tables</span>
<span class="o">|</span> <span class="s2">&quot;TriggerLoadJobsWithTempTables&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">TriggerLoadJobs</span><span class="p">(</span>
<span class="n">schema</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span>
<span class="n">project</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">project</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span>
<span class="n">test_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">,</span>
<span class="n">temporary_tables</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">additional_bq_parameters</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">,</span>
<span class="n">source_format</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_temp_file_format</span><span class="p">,</span>
<span class="n">step_name</span><span class="o">=</span><span class="n">step_name</span><span class="p">,</span>
<span class="n">load_job_project_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span><span class="p">),</span>
<span class="n">load_job_name_pcv</span><span class="p">,</span>
<span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span><span class="p">)</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span>
<span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">TEMP_TABLES</span><span class="p">,</span>
<span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">ONGOING_JOBS</span><span class="p">,</span>
<span class="n">main</span><span class="o">=</span><span class="s1">&#39;main&#39;</span><span class="p">))</span>
<span class="n">finished_temp_tables_load_job_ids_pc</span> <span class="o">=</span> <span class="n">trigger_loads_outputs</span><span class="p">[</span><span class="s1">&#39;main&#39;</span><span class="p">]</span>
<span class="n">temp_tables_load_job_ids_pc</span> <span class="o">=</span> <span class="n">trigger_loads_outputs</span><span class="p">[</span>
<span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">ONGOING_JOBS</span><span class="p">]</span>
<span class="n">temp_tables_pc</span> <span class="o">=</span> <span class="n">trigger_loads_outputs</span><span class="p">[</span><span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">TEMP_TABLES</span><span class="p">]</span>
<span class="n">schema_mod_job_ids_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">finished_temp_tables_load_job_ids_pc</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">UpdateDestinationSchema</span><span class="p">(</span>
<span class="n">project</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">project</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span>
<span class="n">test_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">,</span>
<span class="n">additional_bq_parameters</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">,</span>
<span class="n">step_name</span><span class="o">=</span><span class="n">step_name</span><span class="p">,</span>
<span class="n">load_job_project_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span><span class="p">),</span>
<span class="n">schema_mod_job_name_pcv</span><span class="p">))</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">==</span> <span class="s1">&#39;WRITE_TRUNCATE&#39;</span><span class="p">:</span>
<span class="c1"># All loads going to the same table must be processed together so that</span>
<span class="c1"># the truncation happens only once. See BEAM-24535.</span>
<span class="n">finished_temp_tables_load_job_ids_list_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">finished_temp_tables_load_job_ids_pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">MapTuple</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">destination</span><span class="p">,</span>
<span class="n">job_reference</span><span class="p">:</span> <span class="p">(</span>
<span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span><span class="o">.</span><span class="n">tableId</span><span class="p">,</span>
<span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span><span class="p">)))</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">MapTuple</span><span class="p">(</span><span class="k">lambda</span> <span class="n">tableId</span><span class="p">,</span> <span class="n">batch</span><span class="p">:</span> <span class="nb">list</span><span class="p">(</span><span class="n">batch</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># Loads can happen in parallel.</span>
<span class="n">finished_temp_tables_load_job_ids_list_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">finished_temp_tables_load_job_ids_pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">[</span><span class="n">x</span><span class="p">]))</span>
<span class="n">copy_job_outputs</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">finished_temp_tables_load_job_ids_list_pc</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">TriggerCopyJobs</span><span class="p">(</span>
<span class="n">project</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">project</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span>
<span class="n">test_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">,</span>
<span class="n">step_name</span><span class="o">=</span><span class="n">step_name</span><span class="p">,</span>
<span class="n">load_job_project_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span><span class="p">),</span>
<span class="n">copy_job_name_pcv</span><span class="p">,</span>
<span class="n">pvalue</span><span class="o">.</span><span class="n">AsIter</span><span class="p">(</span><span class="n">schema_mod_job_ids_pc</span><span class="p">))</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span>
<span class="n">TriggerCopyJobs</span><span class="o">.</span><span class="n">TRIGGER_DELETE_TEMP_TABLES</span><span class="p">,</span> <span class="n">main</span><span class="o">=</span><span class="s1">&#39;main&#39;</span><span class="p">))</span>
<span class="n">destination_copy_job_ids_pc</span> <span class="o">=</span> <span class="n">copy_job_outputs</span><span class="p">[</span><span class="s1">&#39;main&#39;</span><span class="p">]</span>
<span class="n">trigger_delete</span> <span class="o">=</span> <span class="n">copy_job_outputs</span><span class="p">[</span>
<span class="n">TriggerCopyJobs</span><span class="o">.</span><span class="n">TRIGGER_DELETE_TEMP_TABLES</span><span class="p">]</span>
<span class="n">_</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">temp_tables_pc</span>
<span class="o">|</span> <span class="s2">&quot;RemoveTempTables/AddUselessValue&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="n">unused_trigger</span><span class="p">:</span> <span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsList</span><span class="p">(</span><span class="n">trigger_delete</span><span class="p">))</span>
<span class="o">|</span> <span class="s2">&quot;RemoveTempTables/DeduplicateTables&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span>
<span class="o">|</span> <span class="s2">&quot;RemoveTempTables/GetTableNames&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Keys</span><span class="p">()</span>
<span class="o">|</span> <span class="s2">&quot;RemoveTempTables/Delete&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">DeleteTablesFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)))</span>
<span class="c1"># Load data directly to destination table</span>
<span class="n">destination_load_job_ids_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">partitions_direct_to_destination</span>
<span class="o">|</span> <span class="s2">&quot;TriggerLoadJobsWithoutTempTables&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">TriggerLoadJobs</span><span class="p">(</span>
<span class="n">schema</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span>
<span class="n">test_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">,</span>
<span class="n">temporary_tables</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">additional_bq_parameters</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">,</span>
<span class="n">source_format</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_temp_file_format</span><span class="p">,</span>
<span class="n">step_name</span><span class="o">=</span><span class="n">step_name</span><span class="p">,</span>
<span class="n">load_job_project_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span><span class="p">),</span>
<span class="n">load_job_name_pcv</span><span class="p">,</span>
<span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span><span class="p">)</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span>
<span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">ONGOING_JOBS</span><span class="p">,</span> <span class="n">main</span><span class="o">=</span><span class="s1">&#39;main&#39;</span><span class="p">)</span>
<span class="p">)[</span><span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">ONGOING_JOBS</span><span class="p">]</span>
<span class="n">destination_load_job_ids_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="p">(</span><span class="n">temp_tables_load_job_ids_pc</span><span class="p">,</span> <span class="n">destination_load_job_ids_pc</span><span class="p">)</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">())</span>
<span class="k">return</span> <span class="n">destination_load_job_ids_pc</span><span class="p">,</span> <span class="n">destination_copy_job_ids_pc</span>
<div class="viewcode-block" id="BigQueryBatchFileLoads.expand"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="n">p</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span>
<span class="bp">self</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">project</span> <span class="ow">or</span> <span class="n">p</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">project</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">step_name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">label</span>
<span class="k">except</span> <span class="ne">AttributeError</span><span class="p">:</span>
<span class="n">step_name</span> <span class="o">=</span> <span class="s1">&#39;BigQueryBatchFileLoads_</span><span class="si">%d</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">BigQueryBatchFileLoads</span><span class="o">.</span><span class="n">COUNT</span>
<span class="n">BigQueryBatchFileLoads</span><span class="o">.</span><span class="n">COUNT</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="n">temp_location</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">temp_location</span>
<span class="n">job_name</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">p</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">job_name</span> <span class="ow">or</span> <span class="s1">&#39;AUTOMATIC_JOB_NAME&#39;</span><span class="p">)</span>
<span class="n">empty_pc</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="s2">&quot;ImpulseEmptyPC&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([])</span>
<span class="n">singleton_pc</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="s2">&quot;ImpulseSingleElementPC&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span>
<span class="n">load_job_name_pcv</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span>
<span class="n">singleton_pc</span>
<span class="o">|</span> <span class="s2">&quot;LoadJobNamePrefix&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="n">_generate_job_name</span><span class="p">(</span>
<span class="n">job_name</span><span class="p">,</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryJobTypes</span><span class="o">.</span><span class="n">LOAD</span><span class="p">,</span> <span class="s1">&#39;LOAD_STEP&#39;</span><span class="p">)))</span>
<span class="n">schema_mod_job_name_pcv</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span>
<span class="n">singleton_pc</span>
<span class="o">|</span> <span class="s2">&quot;SchemaModJobNamePrefix&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="n">_generate_job_name</span><span class="p">(</span>
<span class="n">job_name</span><span class="p">,</span>
<span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryJobTypes</span><span class="o">.</span><span class="n">LOAD</span><span class="p">,</span>
<span class="s1">&#39;SCHEMA_MOD_STEP&#39;</span><span class="p">)))</span>
<span class="n">copy_job_name_pcv</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span>
<span class="n">singleton_pc</span>
<span class="o">|</span> <span class="s2">&quot;CopyJobNamePrefix&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="n">_generate_job_name</span><span class="p">(</span>
<span class="n">job_name</span><span class="p">,</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryJobTypes</span><span class="o">.</span><span class="n">COPY</span><span class="p">,</span> <span class="s1">&#39;COPY_STEP&#39;</span><span class="p">)))</span>
<span class="n">file_prefix_pcv</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span>
<span class="n">singleton_pc</span>
<span class="o">|</span> <span class="s2">&quot;GenerateFilePrefix&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span>
<span class="n">file_prefix_generator</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_validate</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="p">,</span> <span class="n">temp_location</span><span class="p">)))</span>
<span class="n">destination_data_kv_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">pcoll</span>
<span class="o">|</span> <span class="s2">&quot;RewindowIntoGlobal&quot;</span> <span class="o">&gt;&gt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">_window_fn</span><span class="p">()</span>
<span class="o">|</span> <span class="s2">&quot;AppendDestination&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">bigquery_tools</span><span class="o">.</span><span class="n">AppendDestinationsFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">destination</span><span class="p">),</span>
<span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">table_side_inputs</span><span class="p">))</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">with_auto_sharding</span><span class="p">:</span>
<span class="n">all_destination_file_pairs_pc</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_write_files</span><span class="p">(</span>
<span class="n">destination_data_kv_pc</span><span class="p">,</span> <span class="n">file_prefix_pcv</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">all_destination_file_pairs_pc</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_write_files_with_auto_sharding</span><span class="p">(</span>
<span class="n">destination_data_kv_pc</span><span class="p">,</span> <span class="n">file_prefix_pcv</span><span class="p">)</span>
<span class="n">grouped_files_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">all_destination_file_pairs_pc</span>
<span class="o">|</span> <span class="s2">&quot;GroupFilesByTableDestinations&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">())</span>
<span class="n">partitions</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">grouped_files_pc</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">PartitionFiles</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_partition_size</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_partition</span><span class="p">))</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span>
<span class="n">PartitionFiles</span><span class="o">.</span><span class="n">MULTIPLE_PARTITIONS_TAG</span><span class="p">,</span>
<span class="n">PartitionFiles</span><span class="o">.</span><span class="n">SINGLE_PARTITION_TAG</span><span class="p">))</span>
<span class="n">multiple_partitions_per_destination_pc</span> <span class="o">=</span> <span class="n">partitions</span><span class="p">[</span>
<span class="n">PartitionFiles</span><span class="o">.</span><span class="n">MULTIPLE_PARTITIONS_TAG</span><span class="p">]</span>
<span class="n">single_partition_per_destination_pc</span> <span class="o">=</span> <span class="n">partitions</span><span class="p">[</span>
<span class="n">PartitionFiles</span><span class="o">.</span><span class="n">SINGLE_PARTITION_TAG</span><span class="p">]</span>
<span class="c1"># When using dynamic destinations, elements with both single as well as</span>
<span class="c1"># multiple partitions are loaded into BigQuery using temporary tables to</span>
<span class="c1"># ensure atomicity.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">dynamic_destinations</span><span class="p">:</span>
<span class="n">all_partitions</span> <span class="o">=</span> <span class="p">((</span>
<span class="n">multiple_partitions_per_destination_pc</span><span class="p">,</span>
<span class="n">single_partition_per_destination_pc</span><span class="p">)</span>
<span class="o">|</span> <span class="s2">&quot;FlattenPartitions&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">())</span>
<span class="n">destination_load_job_ids_pc</span><span class="p">,</span> <span class="n">destination_copy_job_ids_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_load_data</span><span class="p">(</span><span class="n">all_partitions</span><span class="p">,</span>
<span class="n">empty_pc</span><span class="p">,</span>
<span class="n">load_job_name_pcv</span><span class="p">,</span>
<span class="n">schema_mod_job_name_pcv</span><span class="p">,</span>
<span class="n">copy_job_name_pcv</span><span class="p">,</span>
<span class="n">p</span><span class="p">,</span>
<span class="n">step_name</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">destination_load_job_ids_pc</span><span class="p">,</span> <span class="n">destination_copy_job_ids_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_load_data</span><span class="p">(</span><span class="n">multiple_partitions_per_destination_pc</span><span class="p">,</span>
<span class="n">single_partition_per_destination_pc</span><span class="p">,</span>
<span class="n">load_job_name_pcv</span><span class="p">,</span>
<span class="n">schema_mod_job_name_pcv</span><span class="p">,</span>
<span class="n">copy_job_name_pcv</span><span class="p">,</span>
<span class="n">p</span><span class="p">,</span>
<span class="n">step_name</span><span class="p">))</span>
<span class="k">return</span> <span class="p">{</span>
<span class="bp">self</span><span class="o">.</span><span class="n">DESTINATION_JOBID_PAIRS</span><span class="p">:</span> <span class="n">destination_load_job_ids_pc</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">DESTINATION_FILE_PAIRS</span><span class="p">:</span> <span class="n">all_destination_file_pairs_pc</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">DESTINATION_COPY_JOBID_PAIRS</span><span class="p">:</span> <span class="n">destination_copy_job_ids_pc</span><span class="p">,</span>
<span class="p">}</span></div></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>