blob: 2b985b49f4aab30b01c6937af973235d1da1ab82 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.io.gcp.bigquery_file_loads &mdash; Apache Beam documentation</title>
<link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" />
<link rel="index" title="Index"
href="../../../../genindex.html"/>
<link rel="search" title="Search" href="../../../../search.html"/>
<link rel="top" title="Apache Beam documentation" href="../../../../index.html"/>
<link rel="up" title="Module code" href="../../../index.html"/>
<script src="../../../../_static/js/modernizr.min.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search">
<a href="../../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.internal.html">apache_beam.internal package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.tools.html">apache_beam.tools package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.utils.html">apache_beam.utils package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.version.html">apache_beam.version module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.io.gcp.bigquery_file_loads</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.io.gcp.bigquery_file_loads</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd">Functionality to perform file loads into BigQuery for Batch and Streaming</span>
<span class="sd">pipelines.</span>
<span class="sd">This source is able to work around BigQuery load quotas and limitations. When</span>
<span class="sd">destinations are dynamic, or when data for a single job is too large, the data</span>
<span class="sd">will be split into multiple jobs.</span>
<span class="sd">NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">absolute_import</span>
<span class="kn">import</span> <span class="nn">datetime</span>
<span class="kn">import</span> <span class="nn">hashlib</span>
<span class="kn">import</span> <span class="nn">itertools</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">random</span>
<span class="kn">import</span> <span class="nn">time</span>
<span class="kn">import</span> <span class="nn">uuid</span>
<span class="kn">from</span> <span class="nn">future.utils</span> <span class="k">import</span> <span class="n">iteritems</span>
<span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="k">import</span> <span class="n">pvalue</span>
<span class="kn">from</span> <span class="nn">apache_beam.io</span> <span class="k">import</span> <span class="n">filesystems</span> <span class="k">as</span> <span class="n">fs</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp</span> <span class="k">import</span> <span class="n">bigquery_tools</span>
<span class="kn">from</span> <span class="nn">apache_beam.options</span> <span class="k">import</span> <span class="n">value_provider</span> <span class="k">as</span> <span class="n">vp</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="k">import</span> <span class="n">GoogleCloudOptions</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="k">import</span> <span class="n">trigger</span>
<span class="n">ONE_TERABYTE</span> <span class="o">=</span> <span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">40</span><span class="p">)</span>
<span class="c1"># The maximum file size for imports is 5TB. We keep our files under that.</span>
<span class="n">_DEFAULT_MAX_FILE_SIZE</span> <span class="o">=</span> <span class="mi">4</span> <span class="o">*</span> <span class="n">ONE_TERABYTE</span>
<span class="n">_DEFAULT_MAX_WRITERS_PER_BUNDLE</span> <span class="o">=</span> <span class="mi">20</span>
<span class="c1"># The maximum size for a single load job is one terabyte</span>
<span class="n">_MAXIMUM_LOAD_SIZE</span> <span class="o">=</span> <span class="mi">15</span> <span class="o">*</span> <span class="n">ONE_TERABYTE</span>
<span class="c1"># Big query only supports up to 10 thousand URIs for a single load job.</span>
<span class="n">_MAXIMUM_SOURCE_URIS</span> <span class="o">=</span> <span class="mi">10</span><span class="o">*</span><span class="mi">1000</span>
<span class="c1"># If triggering_frequency is supplied, we will trigger the file write after</span>
<span class="c1"># this many records are written.</span>
<span class="n">_FILE_TRIGGERING_RECORD_COUNT</span> <span class="o">=</span> <span class="mi">500000</span>
<span class="k">def</span> <span class="nf">_generate_load_job_name</span><span class="p">():</span>
<span class="n">datetime_component</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s2">&quot;%Y_%m_</span><span class="si">%d</span><span class="s2">_%H%M%S&quot;</span><span class="p">)</span>
<span class="c1"># TODO(pabloem): include job id / pipeline component?</span>
<span class="k">return</span> <span class="s1">&#39;beam_load_</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">datetime_component</span><span class="p">,</span> <span class="n">random</span><span class="o">.</span><span class="n">randint</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="mi">100</span><span class="p">))</span>
<div class="viewcode-block" id="file_prefix_generator"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.file_prefix_generator">[docs]</a><span class="k">def</span> <span class="nf">file_prefix_generator</span><span class="p">(</span><span class="n">with_validation</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">pipeline_gcs_location</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">temp_location</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">_generate_file_prefix</span><span class="p">(</span><span class="n">unused_elm</span><span class="p">):</span>
<span class="c1"># If a gcs location is provided to the pipeline, then we shall use that.</span>
<span class="c1"># Otherwise, we shall use the temp_location from pipeline options.</span>
<span class="n">gcs_base</span> <span class="o">=</span> <span class="n">pipeline_gcs_location</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">gcs_base</span><span class="p">:</span>
<span class="n">gcs_base</span> <span class="o">=</span> <span class="n">temp_location</span>
<span class="c1"># This will fail at pipeline execution time, but will fail early, as this</span>
<span class="c1"># step doesn&#39;t have any dependencies (and thus will be one of the first</span>
<span class="c1"># stages to be run).</span>
<span class="k">if</span> <span class="n">with_validation</span> <span class="ow">and</span> <span class="p">(</span><span class="ow">not</span> <span class="n">gcs_base</span> <span class="ow">or</span> <span class="ow">not</span> <span class="n">gcs_base</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;gs://&#39;</span><span class="p">)):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Invalid GCS location: </span><span class="si">%r</span><span class="s1">.</span><span class="se">\n</span><span class="s1">&#39;</span>
<span class="s1">&#39;Writing to BigQuery with FILE_LOADS method requires a &#39;</span>
<span class="s1">&#39;GCS location to be provided to write files to be loaded&#39;</span>
<span class="s1">&#39; loaded into BigQuery. Please provide a GCS bucket, or &#39;</span>
<span class="s1">&#39;pass method=&quot;STREAMING_INSERTS&quot; to WriteToBigQuery.&#39;</span>
<span class="o">%</span> <span class="n">gcs_base</span><span class="p">)</span>
<span class="n">prefix_uuid</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">()</span>
<span class="k">return</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">gcs_base</span><span class="p">,</span> <span class="s1">&#39;bq_load&#39;</span><span class="p">,</span> <span class="n">prefix_uuid</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_generate_file_prefix</span></div>
<span class="k">def</span> <span class="nf">_make_new_file_writer</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">):</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">get_hashable_destination</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="c1"># Windows does not allow : on filenames. Replacing with underscore.</span>
<span class="c1"># Other disallowed characters are:</span>
<span class="c1"># https://docs.microsoft.com/en-us/windows/desktop/fileio/naming-a-file</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">destination</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;:&#39;</span><span class="p">,</span> <span class="s1">&#39;.&#39;</span><span class="p">)</span>
<span class="n">directory</span> <span class="o">=</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">exists</span><span class="p">(</span><span class="n">directory</span><span class="p">):</span>
<span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">mkdirs</span><span class="p">(</span><span class="n">directory</span><span class="p">)</span>
<span class="n">file_name</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span>
<span class="n">file_path</span> <span class="o">=</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">,</span> <span class="n">file_name</span><span class="p">)</span>
<span class="k">return</span> <span class="n">file_path</span><span class="p">,</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">create</span><span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="s1">&#39;application/text&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_bq_uuid</span><span class="p">(</span><span class="n">seed</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">seed</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s2">&quot;-&quot;</span><span class="p">,</span> <span class="s2">&quot;&quot;</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="n">hashlib</span><span class="o">.</span><span class="n">md5</span><span class="p">(</span><span class="n">seed</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">hexdigest</span><span class="p">())</span>
<span class="k">class</span> <span class="nc">_ShardDestinations</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Adds a shard number to the key of the KV element.</span>
<span class="sd"> Experimental; no backwards compatibility guarantees.&quot;&quot;&quot;</span>
<span class="n">DEFAULT_SHARDING_FACTOR</span> <span class="o">=</span> <span class="mi">10</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sharding_factor</span><span class="o">=</span><span class="n">DEFAULT_SHARDING_FACTOR</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sharding_factor</span> <span class="o">=</span> <span class="n">sharding_factor</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_shard_count</span> <span class="o">=</span> <span class="n">random</span><span class="o">.</span><span class="n">randrange</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sharding_factor</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">row</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="n">sharded_destination</span> <span class="o">=</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_shard_count</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">sharding_factor</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_shard_count</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">sharded_destination</span><span class="p">,</span> <span class="n">row</span><span class="p">)</span>
<div class="viewcode-block" id="WriteRecordsToFile"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile">[docs]</a><span class="k">class</span> <span class="nc">WriteRecordsToFile</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Write input records to files before triggering a load job.</span>
<span class="sd"> This transform keeps up to ``max_files_per_bundle`` files open to write to. It</span>
<span class="sd"> receives (destination, record) tuples, and it writes the records to different</span>
<span class="sd"> files for each destination.</span>
<span class="sd"> If there are more than ``max_files_per_bundle`` destinations that we need to</span>
<span class="sd"> write to, then those records are grouped by their destination, and later</span>
<span class="sd"> written to files by ``WriteGroupedRecordsToFile``.</span>
<span class="sd"> It outputs two PCollections.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">UNWRITTEN_RECORD_TAG</span> <span class="o">=</span> <span class="s1">&#39;UnwrittenRecords&#39;</span>
<span class="n">WRITTEN_FILE_TAG</span> <span class="o">=</span> <span class="s1">&#39;WrittenFiles&#39;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span>
<span class="n">max_files_per_bundle</span><span class="o">=</span><span class="n">_DEFAULT_MAX_WRITERS_PER_BUNDLE</span><span class="p">,</span>
<span class="n">max_file_size</span><span class="o">=</span><span class="n">_DEFAULT_MAX_FILE_SIZE</span><span class="p">,</span>
<span class="n">coder</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initialize a :class:`WriteRecordsToFile`.</span>
<span class="sd"> Args:</span>
<span class="sd"> max_files_per_bundle (int): The maximum number of files that can be kept</span>
<span class="sd"> open during execution of this step in a worker. This is to avoid over-</span>
<span class="sd"> whelming the worker memory.</span>
<span class="sd"> max_file_size (int): The maximum size in bytes for a file to be used in</span>
<span class="sd"> an export job.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span> <span class="o">=</span> <span class="n">max_files_per_bundle</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span> <span class="o">=</span> <span class="n">max_file_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">coder</span> <span class="o">=</span> <span class="n">coder</span> <span class="ow">or</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">RowAsDictJsonCoder</span><span class="p">()</span>
<div class="viewcode-block" id="WriteRecordsToFile.display_data"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span>
<span class="s1">&#39;max_files_per_bundle&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span><span class="p">,</span>
<span class="s1">&#39;max_file_size&#39;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">),</span>
<span class="s1">&#39;coder&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">coder</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span>
<span class="p">}</span></div>
<div class="viewcode-block" id="WriteRecordsToFile.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span> <span class="o">=</span> <span class="p">{}</span></div>
<div class="viewcode-block" id="WriteRecordsToFile.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">file_prefix</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Take a tuple with (destination, row) and write to file or spill out.</span>
<span class="sd"> Destination may be a ``TableReference`` or a string, and row is a</span>
<span class="sd"> Python dictionary for a row to be inserted to BigQuery.&quot;&quot;&quot;</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">get_hashable_destination</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="n">row</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="k">if</span> <span class="n">destination</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">:</span>
<span class="n">writer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">[</span><span class="n">destination</span><span class="p">]</span>
<span class="k">elif</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">)</span> <span class="o">&lt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span><span class="p">:</span>
<span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span><span class="p">)</span> <span class="o">=</span> <span class="n">_make_new_file_writer</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">[</span><span class="n">destination</span><span class="p">]</span> <span class="o">=</span> <span class="n">writer</span>
<span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span><span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">,</span>
<span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">file_path</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span>
<span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">UNWRITTEN_RECORD_TAG</span><span class="p">,</span> <span class="n">element</span><span class="p">)</span>
<span class="k">return</span>
<span class="c1"># TODO(pabloem): Is it possible for this to throw exception?</span>
<span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">row</span><span class="p">))</span>
<span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="sa">b</span><span class="s1">&#39;</span><span class="se">\n</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">writer</span><span class="o">.</span><span class="n">tell</span><span class="p">()</span> <span class="o">&gt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">:</span>
<span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span></div>
<div class="viewcode-block" id="WriteRecordsToFile.finish_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.finish_bundle">[docs]</a> <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">writer</span> <span class="ow">in</span> <span class="n">iteritems</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">):</span>
<span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span> <span class="o">=</span> <span class="p">{}</span></div></div>
<div class="viewcode-block" id="WriteGroupedRecordsToFile"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteGroupedRecordsToFile">[docs]</a><span class="k">class</span> <span class="nc">WriteGroupedRecordsToFile</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Receives collection of dest-iterable(records), writes it to files.</span>
<span class="sd"> This is different from ``WriteRecordsToFile`` because it receives records</span>
<span class="sd"> grouped by destination. This means that it&#39;s not necessary to keep multiple</span>
<span class="sd"> file descriptors open, because we know for sure when records for a single</span>
<span class="sd"> destination have been written out.</span>
<span class="sd"> Experimental; no backwards compatibility guarantees.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">max_file_size</span><span class="o">=</span><span class="n">_DEFAULT_MAX_FILE_SIZE</span><span class="p">,</span>
<span class="n">coder</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span> <span class="o">=</span> <span class="n">max_file_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">coder</span> <span class="o">=</span> <span class="n">coder</span> <span class="ow">or</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">RowAsDictJsonCoder</span><span class="p">()</span>
<div class="viewcode-block" id="WriteGroupedRecordsToFile.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteGroupedRecordsToFile.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">file_prefix</span><span class="p">):</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">rows</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="n">writer</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">for</span> <span class="n">row</span> <span class="ow">in</span> <span class="n">rows</span><span class="p">:</span>
<span class="k">if</span> <span class="n">writer</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span><span class="p">)</span> <span class="o">=</span> <span class="n">_make_new_file_writer</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">)</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">file_path</span><span class="p">)</span>
<span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">row</span><span class="p">))</span>
<span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="sa">b</span><span class="s1">&#39;</span><span class="se">\n</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">writer</span><span class="o">.</span><span class="n">tell</span><span class="p">()</span> <span class="o">&gt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">:</span>
<span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="n">writer</span> <span class="o">=</span> <span class="kc">None</span></div></div>
<div class="viewcode-block" id="TriggerCopyJobs"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs">[docs]</a><span class="k">class</span> <span class="nc">TriggerCopyJobs</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Launches jobs to copy from temporary tables into the main target table.</span>
<span class="sd"> When a job needs to write to multiple destination tables, or when a single</span>
<span class="sd"> destination table needs to have multiple load jobs to write to it, files are</span>
<span class="sd"> loaded into temporary tables, and those tables are later copied to the</span>
<span class="sd"> destination tables.</span>
<span class="sd"> This transform emits (destination, job_reference) pairs.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">temporary_tables</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="n">create_disposition</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span>
<span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span> <span class="o">=</span> <span class="n">temporary_tables</span>
<div class="viewcode-block" id="TriggerCopyJobs.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span></div>
<div class="viewcode-block" id="TriggerCopyJobs.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">job_name_prefix</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">job_reference</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span><span class="p">:</span>
<span class="c1"># If we did not use temporary tables, then we do not need to trigger any</span>
<span class="c1"># copy jobs.</span>
<span class="k">return</span>
<span class="n">copy_to_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="k">if</span> <span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span><span class="s1">&#39;project&#39;</span><span class="p">,</span>
<span class="nb">str</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="n">copy_from_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="n">copy_from_reference</span><span class="o">.</span><span class="n">tableId</span> <span class="o">=</span> <span class="n">job_reference</span><span class="o">.</span><span class="n">jobId</span>
<span class="k">if</span> <span class="n">copy_from_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">copy_from_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span>
<span class="s1">&#39;project&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="n">copy_job_name</span> <span class="o">=</span> <span class="s1">&#39;</span><span class="si">%s</span><span class="s1">_copy_</span><span class="si">%s</span><span class="s1">_to_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span>
<span class="n">job_name_prefix</span><span class="p">,</span>
<span class="n">_bq_uuid</span><span class="p">(</span><span class="s1">&#39;</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">.</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">copy_from_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span>
<span class="n">copy_from_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span>
<span class="n">copy_from_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)),</span>
<span class="n">_bq_uuid</span><span class="p">(</span><span class="s1">&#39;</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">.</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span>
<span class="n">copy_to_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span>
<span class="n">copy_to_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)))</span>
<span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Triggering copy job from </span><span class="si">%s</span><span class="s2"> to </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span>
<span class="n">copy_from_reference</span><span class="p">,</span> <span class="n">copy_to_reference</span><span class="p">)</span>
<span class="n">job_reference</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">_insert_copy_job</span><span class="p">(</span>
<span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span>
<span class="n">copy_job_name</span><span class="p">,</span>
<span class="n">copy_from_reference</span><span class="p">,</span>
<span class="n">copy_to_reference</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">)</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span><span class="p">)</span></div></div>
<div class="viewcode-block" id="TriggerLoadJobs"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs">[docs]</a><span class="k">class</span> <span class="nc">TriggerLoadJobs</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Triggers the import jobs to BQ.</span>
<span class="sd"> Experimental; no backwards compatibility guarantees.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">TEMP_TABLES</span> <span class="o">=</span> <span class="s1">&#39;TemporaryTables&#39;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span>
<span class="n">schema</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">temporary_tables</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">additional_bq_parameters</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span>
<span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span> <span class="o">=</span> <span class="n">temporary_tables</span>
<span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span> <span class="o">=</span> <span class="n">additional_bq_parameters</span> <span class="ow">or</span> <span class="p">{}</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span><span class="p">:</span>
<span class="c1"># If we are loading into temporary tables, we rely on the default create</span>
<span class="c1"># and write dispositions, which mean that a new table will be created.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="n">create_disposition</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span>
<div class="viewcode-block" id="TriggerLoadJobs.display_data"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">result</span> <span class="o">=</span> <span class="p">{</span><span class="s1">&#39;create_disposition&#39;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">),</span>
<span class="s1">&#39;write_disposition&#39;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">)}</span>
<span class="n">result</span><span class="p">[</span><span class="s1">&#39;schema&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span>
<span class="k">return</span> <span class="n">result</span></div>
<div class="viewcode-block" id="TriggerLoadJobs.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span></div>
<div class="viewcode-block" id="TriggerLoadJobs.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">load_job_name_prefix</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">):</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">files</span> <span class="o">=</span> <span class="nb">iter</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span>
<span class="k">if</span> <span class="n">callable</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">):</span>
<span class="n">schema</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span>
<span class="n">schema</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">schema</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span>
<span class="k">if</span> <span class="n">callable</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">):</span>
<span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span>
<span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span>
<span class="n">batch_of_files</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">itertools</span><span class="o">.</span><span class="n">islice</span><span class="p">(</span><span class="n">files</span><span class="p">,</span> <span class="n">_MAXIMUM_SOURCE_URIS</span><span class="p">))</span>
<span class="k">while</span> <span class="n">batch_of_files</span><span class="p">:</span>
<span class="n">table_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="k">if</span> <span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span>
<span class="s1">&#39;project&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="c1"># Load jobs for a single destination are always triggered from the same</span>
<span class="c1"># worker. This means that we can generate a deterministic numbered job id,</span>
<span class="c1"># and not need to worry.</span>
<span class="n">destination_hash</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">(</span><span class="s1">&#39;</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">.</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">))</span>
<span class="n">timestamp</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">())</span>
<span class="n">job_name</span> <span class="o">=</span> <span class="s1">&#39;</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span>
<span class="n">load_job_name_prefix</span><span class="p">,</span> <span class="n">destination_hash</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">)</span>
<span class="n">logging</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;Batch of files has </span><span class="si">%s</span><span class="s1"> files. Job name is </span><span class="si">%s</span><span class="s1">.&#39;</span><span class="p">,</span>
<span class="nb">len</span><span class="p">(</span><span class="n">batch_of_files</span><span class="p">),</span> <span class="n">job_name</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span><span class="p">:</span>
<span class="c1"># For temporary tables, we create a new table with the name with JobId.</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span> <span class="o">=</span> <span class="n">job_name</span>
<span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span><span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">TEMP_TABLES</span><span class="p">,</span> <span class="n">table_reference</span><span class="p">)</span>
<span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Triggering job </span><span class="si">%s</span><span class="s1"> to load data to BigQuery table </span><span class="si">%s</span><span class="s1">.&#39;</span>
<span class="s1">&#39;Schema: </span><span class="si">%s</span><span class="s1">. Additional parameters: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">job_name</span><span class="p">,</span> <span class="n">table_reference</span><span class="p">,</span>
<span class="n">schema</span><span class="p">,</span> <span class="n">additional_parameters</span><span class="p">)</span>
<span class="n">job_reference</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">perform_load_job</span><span class="p">(</span>
<span class="n">table_reference</span><span class="p">,</span> <span class="n">batch_of_files</span><span class="p">,</span> <span class="n">job_name</span><span class="p">,</span>
<span class="n">schema</span><span class="o">=</span><span class="n">schema</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span>
<span class="n">additional_load_parameters</span><span class="o">=</span><span class="n">additional_parameters</span><span class="p">)</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span><span class="p">)</span>
<span class="c1"># Prepare to trigger the next job</span>
<span class="n">batch_of_files</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">itertools</span><span class="o">.</span><span class="n">islice</span><span class="p">(</span><span class="n">files</span><span class="p">,</span> <span class="n">_MAXIMUM_SOURCE_URIS</span><span class="p">))</span></div></div>
<div class="viewcode-block" id="WaitForBQJobs"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WaitForBQJobs">[docs]</a><span class="k">class</span> <span class="nc">WaitForBQJobs</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Takes in a series of BQ job names as side input, and waits for all of them.</span>
<span class="sd"> If any job fails, it will fail. If all jobs succeed, it will succeed.</span>
<span class="sd"> Experimental; no backwards compatibility guarantees.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">ALL_DONE</span> <span class="o">=</span> <span class="nb">object</span><span class="p">()</span>
<span class="n">FAILED</span> <span class="o">=</span> <span class="nb">object</span><span class="p">()</span>
<span class="n">WAITING</span> <span class="o">=</span> <span class="nb">object</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">test_client</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span>
<div class="viewcode-block" id="WaitForBQJobs.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WaitForBQJobs.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span></div>
<div class="viewcode-block" id="WaitForBQJobs.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WaitForBQJobs.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">dest_ids_list</span><span class="p">):</span>
<span class="n">job_references</span> <span class="o">=</span> <span class="p">[</span><span class="n">elm</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> <span class="k">for</span> <span class="n">elm</span> <span class="ow">in</span> <span class="n">dest_ids_list</span><span class="p">]</span>
<span class="k">while</span> <span class="kc">True</span><span class="p">:</span>
<span class="n">status</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_check_job_states</span><span class="p">(</span><span class="n">job_references</span><span class="p">)</span>
<span class="k">if</span> <span class="n">status</span> <span class="o">==</span> <span class="n">WaitForBQJobs</span><span class="o">.</span><span class="n">FAILED</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">Exception</span><span class="p">(</span>
<span class="s1">&#39;BigQuery jobs failed. BQ error: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_latest_error</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">status</span> <span class="o">==</span> <span class="n">WaitForBQJobs</span><span class="o">.</span><span class="n">ALL_DONE</span><span class="p">:</span>
<span class="k">return</span> <span class="n">dest_ids_list</span> <span class="c1"># Pass the list of destination-jobs downstream</span>
<span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="mi">10</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">_check_job_states</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">job_references</span><span class="p">):</span>
<span class="k">for</span> <span class="n">ref</span> <span class="ow">in</span> <span class="n">job_references</span><span class="p">:</span>
<span class="n">job</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">get_job</span><span class="p">(</span><span class="n">ref</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span>
<span class="n">ref</span><span class="o">.</span><span class="n">jobId</span><span class="p">,</span>
<span class="n">ref</span><span class="o">.</span><span class="n">location</span><span class="p">)</span>
<span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Job status: </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">job</span><span class="o">.</span><span class="n">status</span><span class="p">)</span>
<span class="k">if</span> <span class="n">job</span><span class="o">.</span><span class="n">status</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="s1">&#39;DONE&#39;</span> <span class="ow">and</span> <span class="n">job</span><span class="o">.</span><span class="n">status</span><span class="o">.</span><span class="n">errorResult</span><span class="p">:</span>
<span class="n">logging</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span><span class="s2">&quot;Job </span><span class="si">%s</span><span class="s2"> seems to have failed. Error Result: </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span>
<span class="n">ref</span><span class="o">.</span><span class="n">jobId</span><span class="p">,</span> <span class="n">job</span><span class="o">.</span><span class="n">status</span><span class="o">.</span><span class="n">errorResult</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_latest_error</span> <span class="o">=</span> <span class="n">job</span><span class="o">.</span><span class="n">status</span>
<span class="k">return</span> <span class="n">WaitForBQJobs</span><span class="o">.</span><span class="n">FAILED</span>
<span class="k">elif</span> <span class="n">job</span><span class="o">.</span><span class="n">status</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="s1">&#39;DONE&#39;</span><span class="p">:</span>
<span class="k">continue</span>
<span class="k">return</span> <span class="n">WaitForBQJobs</span><span class="o">.</span><span class="n">ALL_DONE</span></div>
<div class="viewcode-block" id="DeleteTablesFn"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn">[docs]</a><span class="k">class</span> <span class="nc">DeleteTablesFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span>
<div class="viewcode-block" id="DeleteTablesFn.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span></div>
<div class="viewcode-block" id="DeleteTablesFn.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">table_reference</span><span class="p">):</span>
<span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Deleting table </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">table_reference</span><span class="p">)</span>
<span class="n">table_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">table_reference</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">_delete_table</span><span class="p">(</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span>
<span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)</span></div></div>
<div class="viewcode-block" id="BigQueryBatchFileLoads"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads">[docs]</a><span class="k">class</span> <span class="nc">BigQueryBatchFileLoads</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Takes in a set of elements, and inserts them to BigQuery via batch loads.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">DESTINATION_JOBID_PAIRS</span> <span class="o">=</span> <span class="s1">&#39;destination_load_jobid_pairs&#39;</span>
<span class="n">DESTINATION_FILE_PAIRS</span> <span class="o">=</span> <span class="s1">&#39;destination_file_pairs&#39;</span>
<span class="n">DESTINATION_COPY_JOBID_PAIRS</span> <span class="o">=</span> <span class="s1">&#39;destination_copy_jobid_pairs&#39;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">destination</span><span class="p">,</span>
<span class="n">schema</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">custom_gcs_temp_location</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">triggering_frequency</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">coder</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">max_file_size</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">max_files_per_bundle</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">additional_bq_parameters</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">table_side_inputs</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">schema_side_inputs</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">validate</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">is_streaming_pipeline</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">destination</span> <span class="o">=</span> <span class="n">destination</span>
<span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="n">create_disposition</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span>
<span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span> <span class="o">=</span> <span class="n">triggering_frequency</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span> <span class="o">=</span> <span class="n">max_file_size</span> <span class="ow">or</span> <span class="n">_DEFAULT_MAX_FILE_SIZE</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span> <span class="o">=</span> <span class="p">(</span><span class="n">max_files_per_bundle</span> <span class="ow">or</span>
<span class="n">_DEFAULT_MAX_WRITERS_PER_BUNDLE</span><span class="p">)</span>
<span class="k">if</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">custom_gcs_temp_location</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span>
<span class="ow">or</span> <span class="n">custom_gcs_temp_location</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">StaticValueProvider</span><span class="p">(</span>
<span class="nb">str</span><span class="p">,</span> <span class="n">custom_gcs_temp_location</span> <span class="ow">or</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">custom_gcs_temp_location</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span> <span class="o">=</span> <span class="n">custom_gcs_temp_location</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;custom_gcs_temp_location must be str or ValueProvider&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span>
<span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span>
<span class="bp">self</span><span class="o">.</span><span class="n">coder</span> <span class="o">=</span> <span class="n">coder</span> <span class="ow">or</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">RowAsDictJsonCoder</span><span class="p">()</span>
<span class="c1"># If we have multiple destinations, then we will have multiple load jobs,</span>
<span class="c1"># thus we will need temporary tables for atomicity.</span>
<span class="c1"># If the destination is a single one, we assume that we will have only one</span>
<span class="c1"># job to run - and thus we avoid using temporary tables</span>
<span class="bp">self</span><span class="o">.</span><span class="n">temp_tables</span> <span class="o">=</span> <span class="kc">True</span> <span class="k">if</span> <span class="n">callable</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> <span class="k">else</span> <span class="kc">False</span>
<span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span> <span class="o">=</span> <span class="n">additional_bq_parameters</span> <span class="ow">or</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">table_side_inputs</span> <span class="o">=</span> <span class="n">table_side_inputs</span> <span class="ow">or</span> <span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span> <span class="o">=</span> <span class="n">schema_side_inputs</span> <span class="ow">or</span> <span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="o">=</span> <span class="n">is_streaming_pipeline</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_validate</span> <span class="o">=</span> <span class="n">validate</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_validate</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">verify</span><span class="p">()</span>
<div class="viewcode-block" id="BigQueryBatchFileLoads.verify"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads.verify">[docs]</a> <span class="k">def</span> <span class="nf">verify</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="o">.</span><span class="n">get</span><span class="p">(),</span>
<span class="n">vp</span><span class="o">.</span><span class="n">StaticValueProvider</span><span class="p">)</span> <span class="ow">and</span>
<span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="o">.</span><span class="n">get</span><span class="p">()</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;gs://&#39;</span><span class="p">)):</span>
<span class="c1"># Only fail if the custom location is provided, and it is not a GCS</span>
<span class="c1"># location.</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Invalid GCS location: </span><span class="si">%r</span><span class="s1">.</span><span class="se">\n</span><span class="s1">&#39;</span>
<span class="s1">&#39;Writing to BigQuery with FILE_LOADS method requires a &#39;</span>
<span class="s1">&#39;GCS location to be provided to write files to be &#39;</span>
<span class="s1">&#39;loaded into BigQuery. Please provide a GCS bucket, or &#39;</span>
<span class="s1">&#39;pass method=&quot;STREAMING_INSERTS&quot; to WriteToBigQuery.&#39;</span>
<span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="o">.</span><span class="n">get</span><span class="p">())</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;triggering_frequency must be specified to use file&#39;</span>
<span class="s1">&#39;loads in streaming&#39;</span><span class="p">)</span>
<span class="k">elif</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;triggering_frequency can only be used with file&#39;</span>
<span class="s1">&#39;loads in streaming&#39;</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">_window_fn</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Set the correct WindowInto PTransform&quot;&quot;&quot;</span>
<span class="c1"># The user-supplied triggering_frequency is often chosen to control how</span>
<span class="c1"># many BigQuery load jobs are triggered, to prevent going over BigQuery&#39;s</span>
<span class="c1"># daily quota for load jobs. If this is set to a large value, currently we</span>
<span class="c1"># have to buffer all the data until the trigger fires. Instead we ensure</span>
<span class="c1"># that the files are written if a threshold number of records are ready.</span>
<span class="c1"># We use only the user-supplied trigger on the actual BigQuery load.</span>
<span class="c1"># This allows us to offload the data to the filesystem.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span><span class="p">:</span>
<span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">(),</span>
<span class="n">trigger</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">Repeatedly</span><span class="p">(</span>
<span class="n">trigger</span><span class="o">.</span><span class="n">AfterAny</span><span class="p">(</span>
<span class="n">trigger</span><span class="o">.</span><span class="n">AfterProcessingTime</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">),</span>
<span class="n">trigger</span><span class="o">.</span><span class="n">AfterCount</span><span class="p">(</span>
<span class="n">_FILE_TRIGGERING_RECORD_COUNT</span><span class="p">))),</span>
<span class="n">accumulation_mode</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">AccumulationMode</span>\
<span class="o">.</span><span class="n">DISCARDING</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span>
<span class="k">def</span> <span class="nf">_write_files</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">destination_data_kv_pc</span><span class="p">,</span> <span class="n">file_prefix_pcv</span><span class="p">):</span>
<span class="n">outputs</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">destination_data_kv_pc</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">WriteRecordsToFile</span><span class="p">(</span><span class="n">max_files_per_bundle</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span><span class="p">,</span>
<span class="n">max_file_size</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">,</span>
<span class="n">coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">coder</span><span class="p">),</span>
<span class="n">file_prefix</span><span class="o">=</span><span class="n">file_prefix_pcv</span><span class="p">)</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span>
<span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">UNWRITTEN_RECORD_TAG</span><span class="p">,</span>
<span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">))</span>
<span class="c1"># A PCollection of (destination, file) tuples. It lists files with records,</span>
<span class="c1"># and the destination each file is meant to be imported into.</span>
<span class="n">destination_files_kv_pc</span> <span class="o">=</span> <span class="n">outputs</span><span class="p">[</span><span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">]</span>
<span class="c1"># A PCollection of (destination, record) tuples. These are later sharded,</span>
<span class="c1"># grouped, and all records for each destination-shard is written to files.</span>
<span class="c1"># This PCollection is necessary because not all records can be written into</span>
<span class="c1"># files in ``WriteRecordsToFile``.</span>
<span class="n">unwritten_records_pc</span> <span class="o">=</span> <span class="n">outputs</span><span class="p">[</span><span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">UNWRITTEN_RECORD_TAG</span><span class="p">]</span>
<span class="n">more_destination_files_kv_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">unwritten_records_pc</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">_ShardDestinations</span><span class="p">())</span>
<span class="o">|</span> <span class="s2">&quot;GroupShardedRows&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span>
<span class="o">|</span> <span class="s2">&quot;DropShardNumber&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="n">x</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">],</span> <span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">]))</span>
<span class="o">|</span> <span class="s2">&quot;WriteGroupedRecordsToFile&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">WriteGroupedRecordsToFile</span><span class="p">(</span>
<span class="n">coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">coder</span><span class="p">),</span> <span class="n">file_prefix</span><span class="o">=</span><span class="n">file_prefix_pcv</span><span class="p">))</span>
<span class="n">all_destination_file_pairs_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="p">(</span><span class="n">destination_files_kv_pc</span><span class="p">,</span> <span class="n">more_destination_files_kv_pc</span><span class="p">)</span>
<span class="o">|</span> <span class="s2">&quot;DestinationFilesUnion&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">())</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span><span class="p">:</span>
<span class="c1"># Apply the user&#39;s trigger back before we start triggering load jobs</span>
<span class="n">all_destination_file_pairs_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">all_destination_file_pairs_pc</span>
<span class="o">|</span> <span class="s2">&quot;ApplyUserTrigger&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span>
<span class="n">beam</span><span class="o">.</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">(),</span>
<span class="n">trigger</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">Repeatedly</span><span class="p">(</span>
<span class="n">trigger</span><span class="o">.</span><span class="n">AfterAll</span><span class="p">(</span>
<span class="n">trigger</span><span class="o">.</span><span class="n">AfterProcessingTime</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">),</span>
<span class="n">trigger</span><span class="o">.</span><span class="n">AfterCount</span><span class="p">(</span><span class="mi">1</span><span class="p">))),</span>
<span class="n">accumulation_mode</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">AccumulationMode</span><span class="o">.</span><span class="n">DISCARDING</span><span class="p">))</span>
<span class="k">return</span> <span class="n">all_destination_file_pairs_pc</span>
<div class="viewcode-block" id="BigQueryBatchFileLoads.expand"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="n">p</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span>
<span class="n">temp_location</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">temp_location</span>
<span class="n">load_job_name_pcv</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span>
<span class="n">p</span>
<span class="o">|</span> <span class="s2">&quot;ImpulseJobName&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="n">_generate_load_job_name</span><span class="p">()))</span>
<span class="n">file_prefix_pcv</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span>
<span class="n">p</span>
<span class="o">|</span> <span class="s2">&quot;CreateFilePrefixView&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="s1">&#39;&#39;</span><span class="p">])</span>
<span class="o">|</span> <span class="s2">&quot;GenerateFilePrefix&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span>
<span class="n">file_prefix_generator</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_validate</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="p">,</span>
<span class="n">temp_location</span><span class="p">)))</span>
<span class="n">destination_data_kv_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">pcoll</span>
<span class="o">|</span> <span class="s2">&quot;RewindowIntoGlobal&quot;</span> <span class="o">&gt;&gt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">_window_fn</span><span class="p">()</span>
<span class="o">|</span> <span class="s2">&quot;AppendDestination&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">bigquery_tools</span><span class="o">.</span><span class="n">AppendDestinationsFn</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">destination</span><span class="p">),</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">table_side_inputs</span><span class="p">))</span>
<span class="n">all_destination_file_pairs_pc</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_write_files</span><span class="p">(</span><span class="n">destination_data_kv_pc</span><span class="p">,</span>
<span class="n">file_prefix_pcv</span><span class="p">)</span>
<span class="n">grouped_files_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">all_destination_file_pairs_pc</span>
<span class="o">|</span> <span class="s2">&quot;GroupFilesByTableDestinations&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">())</span>
<span class="c1"># Load Jobs are triggered to temporary tables, and those are later copied to</span>
<span class="c1"># the actual appropriate destination query. This ensures atomicity when only</span>
<span class="c1"># some of the load jobs would fail but not other.</span>
<span class="c1"># If any of them fails, then copy jobs are not triggered.</span>
<span class="n">trigger_loads_outputs</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">grouped_files_pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">TriggerLoadJobs</span><span class="p">(</span>
<span class="n">schema</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span>
<span class="n">test_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">,</span>
<span class="n">temporary_tables</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">temp_tables</span><span class="p">,</span>
<span class="n">additional_bq_parameters</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">),</span>
<span class="n">load_job_name_pcv</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span><span class="p">)</span>
<span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span><span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">TEMP_TABLES</span><span class="p">,</span> <span class="n">main</span><span class="o">=</span><span class="s1">&#39;main&#39;</span><span class="p">)</span>
<span class="p">)</span>
<span class="n">destination_job_ids_pc</span> <span class="o">=</span> <span class="n">trigger_loads_outputs</span><span class="p">[</span><span class="s1">&#39;main&#39;</span><span class="p">]</span>
<span class="n">temp_tables_pc</span> <span class="o">=</span> <span class="n">trigger_loads_outputs</span><span class="p">[</span><span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">TEMP_TABLES</span><span class="p">]</span>
<span class="n">destination_copy_job_ids_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">p</span>
<span class="o">|</span> <span class="s2">&quot;ImpulseMonitorLoadJobs&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span>
<span class="o">|</span> <span class="s2">&quot;WaitForLoadJobs&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">WaitForBQJobs</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">),</span>
<span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">AsList</span><span class="p">(</span><span class="n">destination_job_ids_pc</span><span class="p">))</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">TriggerCopyJobs</span><span class="p">(</span>
<span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span>
<span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span>
<span class="n">temporary_tables</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">temp_tables</span><span class="p">,</span>
<span class="n">test_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">),</span> <span class="n">load_job_name_pcv</span><span class="p">))</span>
<span class="n">finished_copy_jobs_pc</span> <span class="o">=</span> <span class="p">(</span><span class="n">p</span>
<span class="o">|</span> <span class="s2">&quot;ImpulseMonitorCopyJobs&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span>
<span class="o">|</span> <span class="s2">&quot;WaitForCopyJobs&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">WaitForBQJobs</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">),</span>
<span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">AsList</span><span class="p">(</span><span class="n">destination_copy_job_ids_pc</span><span class="p">)</span>
<span class="p">))</span>
<span class="n">_</span> <span class="o">=</span> <span class="p">(</span><span class="n">finished_copy_jobs_pc</span>
<span class="o">|</span> <span class="s2">&quot;RemoveTempTables/PassTables&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="n">deleting_tables</span><span class="p">:</span> <span class="n">deleting_tables</span><span class="p">,</span>
<span class="n">pvalue</span><span class="o">.</span><span class="n">AsIter</span><span class="p">(</span><span class="n">temp_tables_pc</span><span class="p">))</span>
<span class="o">|</span> <span class="s2">&quot;RemoveTempTables/AddUselessValue&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="kc">None</span><span class="p">))</span>
<span class="o">|</span> <span class="s2">&quot;RemoveTempTables/DeduplicateTables&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span>
<span class="o">|</span> <span class="s2">&quot;RemoveTempTables/GetTableNames&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">elm</span><span class="p">:</span> <span class="n">elm</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="o">|</span> <span class="s2">&quot;RemoveTempTables/Delete&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">DeleteTablesFn</span><span class="p">()))</span>
<span class="k">return</span> <span class="p">{</span>
<span class="bp">self</span><span class="o">.</span><span class="n">DESTINATION_JOBID_PAIRS</span><span class="p">:</span> <span class="n">destination_job_ids_pc</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">DESTINATION_FILE_PAIRS</span><span class="p">:</span> <span class="n">all_destination_file_pairs_pc</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">DESTINATION_COPY_JOBID_PAIRS</span><span class="p">:</span> <span class="n">destination_copy_job_ids_pc</span><span class="p">,</span>
<span class="p">}</span></div></div>
</pre></div>
</div>
<div class="articleComments">
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright .
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
var DOCUMENTATION_OPTIONS = {
URL_ROOT:'../../../../',
VERSION:'',
COLLAPSE_INDEX:false,
FILE_SUFFIX:'.html',
HAS_SOURCE: true,
SOURCELINK_SUFFIX: '.txt'
};
</script>
<script type="text/javascript" src="../../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../../_static/js/theme.js"></script>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.StickyNav.enable();
});
</script>
</body>
</html>