| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>apache_beam.io.gcp.bigquery_file_loads — Apache Beam documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" /> |
| |
| |
| |
| |
| |
| <link rel="index" title="Index" |
| href="../../../../genindex.html"/> |
| <link rel="search" title="Search" href="../../../../search.html"/> |
| <link rel="top" title="Apache Beam documentation" href="../../../../index.html"/> |
| <link rel="up" title="Module code" href="../../../index.html"/> |
| |
| |
| <script src="../../../../_static/js/modernizr.min.js"></script> |
| |
| </head> |
| |
| <body class="wy-body-for-nav" role="document"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search"> |
| |
| |
| |
| <a href="../../../../index.html" class="icon icon-home"> Apache Beam |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.internal.html">apache_beam.internal package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.testing.html">apache_beam.testing package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.tools.html">apache_beam.tools package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.version.html">apache_beam.version module</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" role="navigation" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../../index.html">Apache Beam</a> |
| |
| </nav> |
| |
| |
| |
| <div class="wy-nav-content"> |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../../index.html">Module code</a> »</li> |
| |
| <li>apache_beam.io.gcp.bigquery_file_loads</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.io.gcp.bigquery_file_loads</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="sd">"""</span> |
| <span class="sd">Functionality to perform file loads into BigQuery for Batch and Streaming</span> |
| <span class="sd">pipelines.</span> |
| |
| <span class="sd">This source is able to work around BigQuery load quotas and limitations. When</span> |
| <span class="sd">destinations are dynamic, or when data for a single job is too large, the data</span> |
| <span class="sd">will be split into multiple jobs.</span> |
| |
| <span class="sd">NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.</span> |
| <span class="sd">"""</span> |
| |
| <span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">absolute_import</span> |
| |
| <span class="kn">import</span> <span class="nn">datetime</span> |
| <span class="kn">import</span> <span class="nn">hashlib</span> |
| <span class="kn">import</span> <span class="nn">itertools</span> |
| <span class="kn">import</span> <span class="nn">logging</span> |
| <span class="kn">import</span> <span class="nn">random</span> |
| <span class="kn">import</span> <span class="nn">time</span> |
| <span class="kn">import</span> <span class="nn">uuid</span> |
| |
| <span class="kn">from</span> <span class="nn">future.utils</span> <span class="k">import</span> <span class="n">iteritems</span> |
| |
| <span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span> |
| <span class="kn">from</span> <span class="nn">apache_beam</span> <span class="k">import</span> <span class="n">pvalue</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io</span> <span class="k">import</span> <span class="n">filesystems</span> <span class="k">as</span> <span class="n">fs</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp</span> <span class="k">import</span> <span class="n">bigquery_tools</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.options</span> <span class="k">import</span> <span class="n">value_provider</span> <span class="k">as</span> <span class="n">vp</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="k">import</span> <span class="n">GoogleCloudOptions</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="k">import</span> <span class="n">trigger</span> |
| |
| <span class="n">ONE_TERABYTE</span> <span class="o">=</span> <span class="p">(</span><span class="mi">1</span> <span class="o"><<</span> <span class="mi">40</span><span class="p">)</span> |
| |
| <span class="c1"># The maximum file size for imports is 5TB. We keep our files under that.</span> |
| <span class="n">_DEFAULT_MAX_FILE_SIZE</span> <span class="o">=</span> <span class="mi">4</span> <span class="o">*</span> <span class="n">ONE_TERABYTE</span> |
| |
| <span class="n">_DEFAULT_MAX_WRITERS_PER_BUNDLE</span> <span class="o">=</span> <span class="mi">20</span> |
| |
| <span class="c1"># The maximum size for a single load job is one terabyte</span> |
| <span class="n">_MAXIMUM_LOAD_SIZE</span> <span class="o">=</span> <span class="mi">15</span> <span class="o">*</span> <span class="n">ONE_TERABYTE</span> |
| |
| <span class="c1"># Big query only supports up to 10 thousand URIs for a single load job.</span> |
| <span class="n">_MAXIMUM_SOURCE_URIS</span> <span class="o">=</span> <span class="mi">10</span><span class="o">*</span><span class="mi">1000</span> |
| |
| <span class="c1"># If triggering_frequency is supplied, we will trigger the file write after</span> |
| <span class="c1"># this many records are written.</span> |
| <span class="n">_FILE_TRIGGERING_RECORD_COUNT</span> <span class="o">=</span> <span class="mi">500000</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_generate_load_job_name</span><span class="p">():</span> |
| <span class="n">datetime_component</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s2">"%Y_%m_</span><span class="si">%d</span><span class="s2">_%H%M%S"</span><span class="p">)</span> |
| <span class="c1"># TODO(pabloem): include job id / pipeline component?</span> |
| <span class="k">return</span> <span class="s1">'beam_load_</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span><span class="n">datetime_component</span><span class="p">,</span> <span class="n">random</span><span class="o">.</span><span class="n">randint</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="mi">100</span><span class="p">))</span> |
| |
| |
| <div class="viewcode-block" id="file_prefix_generator"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.file_prefix_generator">[docs]</a><span class="k">def</span> <span class="nf">file_prefix_generator</span><span class="p">(</span><span class="n">with_validation</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">pipeline_gcs_location</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">temp_location</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">_generate_file_prefix</span><span class="p">(</span><span class="n">unused_elm</span><span class="p">):</span> |
| <span class="c1"># If a gcs location is provided to the pipeline, then we shall use that.</span> |
| <span class="c1"># Otherwise, we shall use the temp_location from pipeline options.</span> |
| <span class="n">gcs_base</span> <span class="o">=</span> <span class="n">pipeline_gcs_location</span><span class="o">.</span><span class="n">get</span><span class="p">()</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">gcs_base</span><span class="p">:</span> |
| <span class="n">gcs_base</span> <span class="o">=</span> <span class="n">temp_location</span> |
| |
| <span class="c1"># This will fail at pipeline execution time, but will fail early, as this</span> |
| <span class="c1"># step doesn't have any dependencies (and thus will be one of the first</span> |
| <span class="c1"># stages to be run).</span> |
| <span class="k">if</span> <span class="n">with_validation</span> <span class="ow">and</span> <span class="p">(</span><span class="ow">not</span> <span class="n">gcs_base</span> <span class="ow">or</span> <span class="ow">not</span> <span class="n">gcs_base</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">'gs://'</span><span class="p">)):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'Invalid GCS location: </span><span class="si">%r</span><span class="s1">.</span><span class="se">\n</span><span class="s1">'</span> |
| <span class="s1">'Writing to BigQuery with FILE_LOADS method requires a '</span> |
| <span class="s1">'GCS location to be provided to write files to be loaded'</span> |
| <span class="s1">' loaded into BigQuery. Please provide a GCS bucket, or '</span> |
| <span class="s1">'pass method="STREAMING_INSERTS" to WriteToBigQuery.'</span> |
| <span class="o">%</span> <span class="n">gcs_base</span><span class="p">)</span> |
| |
| <span class="n">prefix_uuid</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">()</span> |
| <span class="k">return</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">gcs_base</span><span class="p">,</span> <span class="s1">'bq_load'</span><span class="p">,</span> <span class="n">prefix_uuid</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">_generate_file_prefix</span></div> |
| |
| |
| <span class="k">def</span> <span class="nf">_make_new_file_writer</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">):</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">get_hashable_destination</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> |
| |
| <span class="c1"># Windows does not allow : on filenames. Replacing with underscore.</span> |
| <span class="c1"># Other disallowed characters are:</span> |
| <span class="c1"># https://docs.microsoft.com/en-us/windows/desktop/fileio/naming-a-file</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">destination</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">':'</span><span class="p">,</span> <span class="s1">'.'</span><span class="p">)</span> |
| |
| <span class="n">directory</span> <span class="o">=</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">exists</span><span class="p">(</span><span class="n">directory</span><span class="p">):</span> |
| <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">mkdirs</span><span class="p">(</span><span class="n">directory</span><span class="p">)</span> |
| |
| <span class="n">file_name</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span> |
| <span class="n">file_path</span> <span class="o">=</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">,</span> <span class="n">file_name</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">file_path</span><span class="p">,</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">create</span><span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="s1">'application/text'</span><span class="p">)</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_bq_uuid</span><span class="p">(</span><span class="n">seed</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">seed</span><span class="p">:</span> |
| <span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s2">"-"</span><span class="p">,</span> <span class="s2">""</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="n">hashlib</span><span class="o">.</span><span class="n">md5</span><span class="p">(</span><span class="n">seed</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf8'</span><span class="p">))</span><span class="o">.</span><span class="n">hexdigest</span><span class="p">())</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_ShardDestinations</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""Adds a shard number to the key of the KV element.</span> |
| |
| <span class="sd"> Experimental; no backwards compatibility guarantees."""</span> |
| <span class="n">DEFAULT_SHARDING_FACTOR</span> <span class="o">=</span> <span class="mi">10</span> |
| |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sharding_factor</span><span class="o">=</span><span class="n">DEFAULT_SHARDING_FACTOR</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sharding_factor</span> <span class="o">=</span> <span class="n">sharding_factor</span> |
| |
| <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_shard_count</span> <span class="o">=</span> <span class="n">random</span><span class="o">.</span><span class="n">randrange</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sharding_factor</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="n">row</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> |
| |
| <span class="n">sharded_destination</span> <span class="o">=</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_shard_count</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">sharding_factor</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_shard_count</span> <span class="o">+=</span> <span class="mi">1</span> |
| <span class="k">yield</span> <span class="p">(</span><span class="n">sharded_destination</span><span class="p">,</span> <span class="n">row</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="WriteRecordsToFile"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile">[docs]</a><span class="k">class</span> <span class="nc">WriteRecordsToFile</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""Write input records to files before triggering a load job.</span> |
| |
| <span class="sd"> This transform keeps up to ``max_files_per_bundle`` files open to write to. It</span> |
| <span class="sd"> receives (destination, record) tuples, and it writes the records to different</span> |
| <span class="sd"> files for each destination.</span> |
| |
| <span class="sd"> If there are more than ``max_files_per_bundle`` destinations that we need to</span> |
| <span class="sd"> write to, then those records are grouped by their destination, and later</span> |
| <span class="sd"> written to files by ``WriteGroupedRecordsToFile``.</span> |
| |
| <span class="sd"> It outputs two PCollections.</span> |
| <span class="sd"> """</span> |
| |
| <span class="n">UNWRITTEN_RECORD_TAG</span> <span class="o">=</span> <span class="s1">'UnwrittenRecords'</span> |
| <span class="n">WRITTEN_FILE_TAG</span> <span class="o">=</span> <span class="s1">'WrittenFiles'</span> |
| |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> |
| <span class="n">max_files_per_bundle</span><span class="o">=</span><span class="n">_DEFAULT_MAX_WRITERS_PER_BUNDLE</span><span class="p">,</span> |
| <span class="n">max_file_size</span><span class="o">=</span><span class="n">_DEFAULT_MAX_FILE_SIZE</span><span class="p">,</span> |
| <span class="n">coder</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""Initialize a :class:`WriteRecordsToFile`.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> max_files_per_bundle (int): The maximum number of files that can be kept</span> |
| <span class="sd"> open during execution of this step in a worker. This is to avoid over-</span> |
| <span class="sd"> whelming the worker memory.</span> |
| <span class="sd"> max_file_size (int): The maximum size in bytes for a file to be used in</span> |
| <span class="sd"> an export job.</span> |
| |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span> <span class="o">=</span> <span class="n">max_files_per_bundle</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span> <span class="o">=</span> <span class="n">max_file_size</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">coder</span> <span class="o">=</span> <span class="n">coder</span> <span class="ow">or</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">RowAsDictJsonCoder</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="WriteRecordsToFile.display_data"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="s1">'max_files_per_bundle'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span><span class="p">,</span> |
| <span class="s1">'max_file_size'</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">),</span> |
| <span class="s1">'coder'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">coder</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span> |
| <span class="p">}</span></div> |
| |
| <div class="viewcode-block" id="WriteRecordsToFile.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span> <span class="o">=</span> <span class="p">{}</span></div> |
| |
| <div class="viewcode-block" id="WriteRecordsToFile.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">file_prefix</span><span class="p">):</span> |
| <span class="sd">"""Take a tuple with (destination, row) and write to file or spill out.</span> |
| |
| <span class="sd"> Destination may be a ``TableReference`` or a string, and row is a</span> |
| <span class="sd"> Python dictionary for a row to be inserted to BigQuery."""</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">get_hashable_destination</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> |
| <span class="n">row</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> |
| |
| <span class="k">if</span> <span class="n">destination</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">:</span> |
| <span class="n">writer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">[</span><span class="n">destination</span><span class="p">]</span> |
| <span class="k">elif</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">)</span> <span class="o"><</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span><span class="p">:</span> |
| <span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span><span class="p">)</span> <span class="o">=</span> <span class="n">_make_new_file_writer</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">[</span><span class="n">destination</span><span class="p">]</span> <span class="o">=</span> <span class="n">writer</span> |
| <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span><span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">,</span> |
| <span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">file_path</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span> |
| <span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">UNWRITTEN_RECORD_TAG</span><span class="p">,</span> <span class="n">element</span><span class="p">)</span> |
| <span class="k">return</span> |
| |
| <span class="c1"># TODO(pabloem): Is it possible for this to throw exception?</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">row</span><span class="p">))</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="sa">b</span><span class="s1">'</span><span class="se">\n</span><span class="s1">'</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">writer</span><span class="o">.</span><span class="n">tell</span><span class="p">()</span> <span class="o">></span> <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">:</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="WriteRecordsToFile.finish_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.finish_bundle">[docs]</a> <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">writer</span> <span class="ow">in</span> <span class="n">iteritems</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">):</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span> <span class="o">=</span> <span class="p">{}</span></div></div> |
| |
| |
| <div class="viewcode-block" id="WriteGroupedRecordsToFile"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteGroupedRecordsToFile">[docs]</a><span class="k">class</span> <span class="nc">WriteGroupedRecordsToFile</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""Receives collection of dest-iterable(records), writes it to files.</span> |
| |
| <span class="sd"> This is different from ``WriteRecordsToFile`` because it receives records</span> |
| <span class="sd"> grouped by destination. This means that it's not necessary to keep multiple</span> |
| <span class="sd"> file descriptors open, because we know for sure when records for a single</span> |
| <span class="sd"> destination have been written out.</span> |
| |
| <span class="sd"> Experimental; no backwards compatibility guarantees.</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">max_file_size</span><span class="o">=</span><span class="n">_DEFAULT_MAX_FILE_SIZE</span><span class="p">,</span> |
| <span class="n">coder</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span> <span class="o">=</span> <span class="n">max_file_size</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">coder</span> <span class="o">=</span> <span class="n">coder</span> <span class="ow">or</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">RowAsDictJsonCoder</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="WriteGroupedRecordsToFile.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteGroupedRecordsToFile.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">file_prefix</span><span class="p">):</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="n">rows</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> |
| |
| <span class="n">writer</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">for</span> <span class="n">row</span> <span class="ow">in</span> <span class="n">rows</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">writer</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span><span class="p">)</span> <span class="o">=</span> <span class="n">_make_new_file_writer</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">)</span> |
| <span class="k">yield</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">file_path</span><span class="p">)</span> |
| |
| <span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">row</span><span class="p">))</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="sa">b</span><span class="s1">'</span><span class="se">\n</span><span class="s1">'</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">writer</span><span class="o">.</span><span class="n">tell</span><span class="p">()</span> <span class="o">></span> <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">:</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="n">writer</span> <span class="o">=</span> <span class="kc">None</span></div></div> |
| |
| |
| <div class="viewcode-block" id="TriggerCopyJobs"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs">[docs]</a><span class="k">class</span> <span class="nc">TriggerCopyJobs</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""Launches jobs to copy from temporary tables into the main target table.</span> |
| |
| <span class="sd"> When a job needs to write to multiple destination tables, or when a single</span> |
| <span class="sd"> destination table needs to have multiple load jobs to write to it, files are</span> |
| <span class="sd"> loaded into temporary tables, and those tables are later copied to the</span> |
| <span class="sd"> destination tables.</span> |
| |
| <span class="sd"> This transform emits (destination, job_reference) pairs.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">temporary_tables</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="n">create_disposition</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span> <span class="o">=</span> <span class="n">temporary_tables</span> |
| |
| <div class="viewcode-block" id="TriggerCopyJobs.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="TriggerCopyJobs.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">job_name_prefix</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="n">job_reference</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span><span class="p">:</span> |
| <span class="c1"># If we did not use temporary tables, then we do not need to trigger any</span> |
| <span class="c1"># copy jobs.</span> |
| <span class="k">return</span> |
| |
| <span class="n">copy_to_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span><span class="s1">'project'</span><span class="p">,</span> |
| <span class="nb">str</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> |
| |
| <span class="n">copy_from_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> |
| <span class="n">copy_from_reference</span><span class="o">.</span><span class="n">tableId</span> <span class="o">=</span> <span class="n">job_reference</span><span class="o">.</span><span class="n">jobId</span> |
| <span class="k">if</span> <span class="n">copy_from_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">copy_from_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span> |
| <span class="s1">'project'</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> |
| |
| <span class="n">copy_job_name</span> <span class="o">=</span> <span class="s1">'</span><span class="si">%s</span><span class="s1">_copy_</span><span class="si">%s</span><span class="s1">_to_</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span> |
| <span class="n">job_name_prefix</span><span class="p">,</span> |
| <span class="n">_bq_uuid</span><span class="p">(</span><span class="s1">'</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">.</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span><span class="n">copy_from_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span> |
| <span class="n">copy_from_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span> |
| <span class="n">copy_from_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)),</span> |
| <span class="n">_bq_uuid</span><span class="p">(</span><span class="s1">'</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">.</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span><span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span> |
| <span class="n">copy_to_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span> |
| <span class="n">copy_to_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)))</span> |
| |
| <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Triggering copy job from </span><span class="si">%s</span><span class="s2"> to </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> |
| <span class="n">copy_from_reference</span><span class="p">,</span> <span class="n">copy_to_reference</span><span class="p">)</span> |
| <span class="n">job_reference</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">_insert_copy_job</span><span class="p">(</span> |
| <span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span> |
| <span class="n">copy_job_name</span><span class="p">,</span> |
| <span class="n">copy_from_reference</span><span class="p">,</span> |
| <span class="n">copy_to_reference</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">)</span> |
| |
| <span class="k">yield</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span><span class="p">)</span></div></div> |
| |
| |
| <div class="viewcode-block" id="TriggerLoadJobs"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs">[docs]</a><span class="k">class</span> <span class="nc">TriggerLoadJobs</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""Triggers the import jobs to BQ.</span> |
| |
| <span class="sd"> Experimental; no backwards compatibility guarantees.</span> |
| <span class="sd"> """</span> |
| |
| <span class="n">TEMP_TABLES</span> <span class="o">=</span> <span class="s1">'TemporaryTables'</span> |
| |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> |
| <span class="n">schema</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">temporary_tables</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">additional_bq_parameters</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span> <span class="o">=</span> <span class="n">temporary_tables</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span> <span class="o">=</span> <span class="n">additional_bq_parameters</span> <span class="ow">or</span> <span class="p">{}</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span><span class="p">:</span> |
| <span class="c1"># If we are loading into temporary tables, we rely on the default create</span> |
| <span class="c1"># and write dispositions, which mean that a new table will be created.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="n">create_disposition</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span> |
| |
| <div class="viewcode-block" id="TriggerLoadJobs.display_data"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="p">{</span><span class="s1">'create_disposition'</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">),</span> |
| <span class="s1">'write_disposition'</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">)}</span> |
| <span class="n">result</span><span class="p">[</span><span class="s1">'schema'</span><span class="p">]</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">result</span></div> |
| |
| <div class="viewcode-block" id="TriggerLoadJobs.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="TriggerLoadJobs.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">load_job_name_prefix</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">):</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="n">files</span> <span class="o">=</span> <span class="nb">iter</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span> |
| |
| <span class="k">if</span> <span class="n">callable</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">):</span> |
| <span class="n">schema</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span> |
| <span class="n">schema</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="o">.</span><span class="n">get</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">schema</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span> |
| |
| <span class="k">if</span> <span class="n">callable</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">):</span> |
| <span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span> |
| <span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="o">.</span><span class="n">get</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span> |
| |
| <span class="n">batch_of_files</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">itertools</span><span class="o">.</span><span class="n">islice</span><span class="p">(</span><span class="n">files</span><span class="p">,</span> <span class="n">_MAXIMUM_SOURCE_URIS</span><span class="p">))</span> |
| <span class="k">while</span> <span class="n">batch_of_files</span><span class="p">:</span> |
| |
| <span class="n">table_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span> |
| <span class="s1">'project'</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> |
| <span class="c1"># Load jobs for a single destination are always triggered from the same</span> |
| <span class="c1"># worker. This means that we can generate a deterministic numbered job id,</span> |
| <span class="c1"># and not need to worry.</span> |
| <span class="n">destination_hash</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">(</span><span class="s1">'</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">.</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span><span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">))</span> |
| <span class="n">timestamp</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">())</span> |
| <span class="n">job_name</span> <span class="o">=</span> <span class="s1">'</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span> |
| <span class="n">load_job_name_prefix</span><span class="p">,</span> <span class="n">destination_hash</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">)</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'Batch of files has </span><span class="si">%s</span><span class="s1"> files. Job name is </span><span class="si">%s</span><span class="s1">.'</span><span class="p">,</span> |
| <span class="nb">len</span><span class="p">(</span><span class="n">batch_of_files</span><span class="p">),</span> <span class="n">job_name</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span><span class="p">:</span> |
| <span class="c1"># For temporary tables, we create a new table with the name with JobId.</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span> <span class="o">=</span> <span class="n">job_name</span> |
| <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span><span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">TEMP_TABLES</span><span class="p">,</span> <span class="n">table_reference</span><span class="p">)</span> |
| |
| <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Triggering job </span><span class="si">%s</span><span class="s1"> to load data to BigQuery table </span><span class="si">%s</span><span class="s1">.'</span> |
| <span class="s1">'Schema: </span><span class="si">%s</span><span class="s1">. Additional parameters: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">job_name</span><span class="p">,</span> <span class="n">table_reference</span><span class="p">,</span> |
| <span class="n">schema</span><span class="p">,</span> <span class="n">additional_parameters</span><span class="p">)</span> |
| <span class="n">job_reference</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">perform_load_job</span><span class="p">(</span> |
| <span class="n">table_reference</span><span class="p">,</span> <span class="n">batch_of_files</span><span class="p">,</span> <span class="n">job_name</span><span class="p">,</span> |
| <span class="n">schema</span><span class="o">=</span><span class="n">schema</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span> |
| <span class="n">additional_load_parameters</span><span class="o">=</span><span class="n">additional_parameters</span><span class="p">)</span> |
| <span class="k">yield</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span><span class="p">)</span> |
| |
| <span class="c1"># Prepare to trigger the next job</span> |
| <span class="n">batch_of_files</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">itertools</span><span class="o">.</span><span class="n">islice</span><span class="p">(</span><span class="n">files</span><span class="p">,</span> <span class="n">_MAXIMUM_SOURCE_URIS</span><span class="p">))</span></div></div> |
| |
| |
| <div class="viewcode-block" id="WaitForBQJobs"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WaitForBQJobs">[docs]</a><span class="k">class</span> <span class="nc">WaitForBQJobs</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""Takes in a series of BQ job names as side input, and waits for all of them.</span> |
| |
| <span class="sd"> If any job fails, it will fail. If all jobs succeed, it will succeed.</span> |
| |
| <span class="sd"> Experimental; no backwards compatibility guarantees.</span> |
| <span class="sd"> """</span> |
| <span class="n">ALL_DONE</span> <span class="o">=</span> <span class="nb">object</span><span class="p">()</span> |
| <span class="n">FAILED</span> <span class="o">=</span> <span class="nb">object</span><span class="p">()</span> |
| <span class="n">WAITING</span> <span class="o">=</span> <span class="nb">object</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">test_client</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span> |
| |
| <div class="viewcode-block" id="WaitForBQJobs.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WaitForBQJobs.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="WaitForBQJobs.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WaitForBQJobs.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">dest_ids_list</span><span class="p">):</span> |
| <span class="n">job_references</span> <span class="o">=</span> <span class="p">[</span><span class="n">elm</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> <span class="k">for</span> <span class="n">elm</span> <span class="ow">in</span> <span class="n">dest_ids_list</span><span class="p">]</span> |
| |
| <span class="k">while</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="n">status</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_check_job_states</span><span class="p">(</span><span class="n">job_references</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">status</span> <span class="o">==</span> <span class="n">WaitForBQJobs</span><span class="o">.</span><span class="n">FAILED</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">Exception</span><span class="p">(</span> |
| <span class="s1">'BigQuery jobs failed. BQ error: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_latest_error</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">status</span> <span class="o">==</span> <span class="n">WaitForBQJobs</span><span class="o">.</span><span class="n">ALL_DONE</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">dest_ids_list</span> <span class="c1"># Pass the list of destination-jobs downstream</span> |
| <span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="mi">10</span><span class="p">)</span></div> |
| |
| <span class="k">def</span> <span class="nf">_check_job_states</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">job_references</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">ref</span> <span class="ow">in</span> <span class="n">job_references</span><span class="p">:</span> |
| <span class="n">job</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">get_job</span><span class="p">(</span><span class="n">ref</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span> |
| <span class="n">ref</span><span class="o">.</span><span class="n">jobId</span><span class="p">,</span> |
| <span class="n">ref</span><span class="o">.</span><span class="n">location</span><span class="p">)</span> |
| |
| <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Job status: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">job</span><span class="o">.</span><span class="n">status</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">job</span><span class="o">.</span><span class="n">status</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="s1">'DONE'</span> <span class="ow">and</span> <span class="n">job</span><span class="o">.</span><span class="n">status</span><span class="o">.</span><span class="n">errorResult</span><span class="p">:</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span><span class="s2">"Job </span><span class="si">%s</span><span class="s2"> seems to have failed. Error Result: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> |
| <span class="n">ref</span><span class="o">.</span><span class="n">jobId</span><span class="p">,</span> <span class="n">job</span><span class="o">.</span><span class="n">status</span><span class="o">.</span><span class="n">errorResult</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_latest_error</span> <span class="o">=</span> <span class="n">job</span><span class="o">.</span><span class="n">status</span> |
| <span class="k">return</span> <span class="n">WaitForBQJobs</span><span class="o">.</span><span class="n">FAILED</span> |
| <span class="k">elif</span> <span class="n">job</span><span class="o">.</span><span class="n">status</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="s1">'DONE'</span><span class="p">:</span> |
| <span class="k">continue</span> |
| |
| <span class="k">return</span> <span class="n">WaitForBQJobs</span><span class="o">.</span><span class="n">ALL_DONE</span></div> |
| |
| |
| <div class="viewcode-block" id="DeleteTablesFn"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn">[docs]</a><span class="k">class</span> <span class="nc">DeleteTablesFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span> |
| |
| <div class="viewcode-block" id="DeleteTablesFn.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="DeleteTablesFn.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">table_reference</span><span class="p">):</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Deleting table </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">table_reference</span><span class="p">)</span> |
| <span class="n">table_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">table_reference</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">_delete_table</span><span class="p">(</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)</span></div></div> |
| |
| |
| <div class="viewcode-block" id="BigQueryBatchFileLoads"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads">[docs]</a><span class="k">class</span> <span class="nc">BigQueryBatchFileLoads</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="sd">"""Takes in a set of elements, and inserts them to BigQuery via batch loads.</span> |
| |
| <span class="sd"> """</span> |
| |
| <span class="n">DESTINATION_JOBID_PAIRS</span> <span class="o">=</span> <span class="s1">'destination_load_jobid_pairs'</span> |
| <span class="n">DESTINATION_FILE_PAIRS</span> <span class="o">=</span> <span class="s1">'destination_file_pairs'</span> |
| <span class="n">DESTINATION_COPY_JOBID_PAIRS</span> <span class="o">=</span> <span class="s1">'destination_copy_jobid_pairs'</span> |
| |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">destination</span><span class="p">,</span> |
| <span class="n">schema</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">custom_gcs_temp_location</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">triggering_frequency</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">coder</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">max_file_size</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">max_files_per_bundle</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">additional_bq_parameters</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">table_side_inputs</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">schema_side_inputs</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">validate</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">is_streaming_pipeline</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">destination</span> <span class="o">=</span> <span class="n">destination</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="n">create_disposition</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span> <span class="o">=</span> <span class="n">triggering_frequency</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span> <span class="o">=</span> <span class="n">max_file_size</span> <span class="ow">or</span> <span class="n">_DEFAULT_MAX_FILE_SIZE</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span> <span class="o">=</span> <span class="p">(</span><span class="n">max_files_per_bundle</span> <span class="ow">or</span> |
| <span class="n">_DEFAULT_MAX_WRITERS_PER_BUNDLE</span><span class="p">)</span> |
| <span class="k">if</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">custom_gcs_temp_location</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span> |
| <span class="ow">or</span> <span class="n">custom_gcs_temp_location</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">StaticValueProvider</span><span class="p">(</span> |
| <span class="nb">str</span><span class="p">,</span> <span class="n">custom_gcs_temp_location</span> <span class="ow">or</span> <span class="s1">''</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">custom_gcs_temp_location</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span> <span class="o">=</span> <span class="n">custom_gcs_temp_location</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'custom_gcs_temp_location must be str or ValueProvider'</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">coder</span> <span class="o">=</span> <span class="n">coder</span> <span class="ow">or</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">RowAsDictJsonCoder</span><span class="p">()</span> |
| |
| <span class="c1"># If we have multiple destinations, then we will have multiple load jobs,</span> |
| <span class="c1"># thus we will need temporary tables for atomicity.</span> |
| <span class="c1"># If the destination is a single one, we assume that we will have only one</span> |
| <span class="c1"># job to run - and thus we avoid using temporary tables</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">temp_tables</span> <span class="o">=</span> <span class="kc">True</span> <span class="k">if</span> <span class="n">callable</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> <span class="k">else</span> <span class="kc">False</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span> <span class="o">=</span> <span class="n">additional_bq_parameters</span> <span class="ow">or</span> <span class="p">{}</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">table_side_inputs</span> <span class="o">=</span> <span class="n">table_side_inputs</span> <span class="ow">or</span> <span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span> <span class="o">=</span> <span class="n">schema_side_inputs</span> <span class="ow">or</span> <span class="p">()</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="o">=</span> <span class="n">is_streaming_pipeline</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_validate</span> <span class="o">=</span> <span class="n">validate</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_validate</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">verify</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="BigQueryBatchFileLoads.verify"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads.verify">[docs]</a> <span class="k">def</span> <span class="nf">verify</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="o">.</span><span class="n">get</span><span class="p">(),</span> |
| <span class="n">vp</span><span class="o">.</span><span class="n">StaticValueProvider</span><span class="p">)</span> <span class="ow">and</span> |
| <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="o">.</span><span class="n">get</span><span class="p">()</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">'gs://'</span><span class="p">)):</span> |
| <span class="c1"># Only fail if the custom location is provided, and it is not a GCS</span> |
| <span class="c1"># location.</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'Invalid GCS location: </span><span class="si">%r</span><span class="s1">.</span><span class="se">\n</span><span class="s1">'</span> |
| <span class="s1">'Writing to BigQuery with FILE_LOADS method requires a '</span> |
| <span class="s1">'GCS location to be provided to write files to be '</span> |
| <span class="s1">'loaded into BigQuery. Please provide a GCS bucket, or '</span> |
| <span class="s1">'pass method="STREAMING_INSERTS" to WriteToBigQuery.'</span> |
| <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="o">.</span><span class="n">get</span><span class="p">())</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'triggering_frequency must be specified to use file'</span> |
| <span class="s1">'loads in streaming'</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'triggering_frequency can only be used with file'</span> |
| <span class="s1">'loads in streaming'</span><span class="p">)</span></div> |
| |
| <span class="k">def</span> <span class="nf">_window_fn</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""Set the correct WindowInto PTransform"""</span> |
| |
| <span class="c1"># The user-supplied triggering_frequency is often chosen to control how</span> |
| <span class="c1"># many BigQuery load jobs are triggered, to prevent going over BigQuery's</span> |
| <span class="c1"># daily quota for load jobs. If this is set to a large value, currently we</span> |
| <span class="c1"># have to buffer all the data until the trigger fires. Instead we ensure</span> |
| <span class="c1"># that the files are written if a threshold number of records are ready.</span> |
| <span class="c1"># We use only the user-supplied trigger on the actual BigQuery load.</span> |
| <span class="c1"># This allows us to offload the data to the filesystem.</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">(),</span> |
| <span class="n">trigger</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">Repeatedly</span><span class="p">(</span> |
| <span class="n">trigger</span><span class="o">.</span><span class="n">AfterAny</span><span class="p">(</span> |
| <span class="n">trigger</span><span class="o">.</span><span class="n">AfterProcessingTime</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">),</span> |
| <span class="n">trigger</span><span class="o">.</span><span class="n">AfterCount</span><span class="p">(</span> |
| <span class="n">_FILE_TRIGGERING_RECORD_COUNT</span><span class="p">))),</span> |
| <span class="n">accumulation_mode</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">AccumulationMode</span>\ |
| <span class="o">.</span><span class="n">DISCARDING</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span> |
| |
| <span class="k">def</span> <span class="nf">_write_files</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">destination_data_kv_pc</span><span class="p">,</span> <span class="n">file_prefix_pcv</span><span class="p">):</span> |
| <span class="n">outputs</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">destination_data_kv_pc</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">WriteRecordsToFile</span><span class="p">(</span><span class="n">max_files_per_bundle</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span><span class="p">,</span> |
| <span class="n">max_file_size</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">,</span> |
| <span class="n">coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">coder</span><span class="p">),</span> |
| <span class="n">file_prefix</span><span class="o">=</span><span class="n">file_prefix_pcv</span><span class="p">)</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span> |
| <span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">UNWRITTEN_RECORD_TAG</span><span class="p">,</span> |
| <span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">))</span> |
| |
| <span class="c1"># A PCollection of (destination, file) tuples. It lists files with records,</span> |
| <span class="c1"># and the destination each file is meant to be imported into.</span> |
| <span class="n">destination_files_kv_pc</span> <span class="o">=</span> <span class="n">outputs</span><span class="p">[</span><span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">]</span> |
| |
| <span class="c1"># A PCollection of (destination, record) tuples. These are later sharded,</span> |
| <span class="c1"># grouped, and all records for each destination-shard is written to files.</span> |
| <span class="c1"># This PCollection is necessary because not all records can be written into</span> |
| <span class="c1"># files in ``WriteRecordsToFile``.</span> |
| <span class="n">unwritten_records_pc</span> <span class="o">=</span> <span class="n">outputs</span><span class="p">[</span><span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">UNWRITTEN_RECORD_TAG</span><span class="p">]</span> |
| |
| <span class="n">more_destination_files_kv_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">unwritten_records_pc</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">_ShardDestinations</span><span class="p">())</span> |
| <span class="o">|</span> <span class="s2">"GroupShardedRows"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span> |
| <span class="o">|</span> <span class="s2">"DropShardNumber"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="n">x</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">],</span> <span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">]))</span> |
| <span class="o">|</span> <span class="s2">"WriteGroupedRecordsToFile"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">WriteGroupedRecordsToFile</span><span class="p">(</span> |
| <span class="n">coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">coder</span><span class="p">),</span> <span class="n">file_prefix</span><span class="o">=</span><span class="n">file_prefix_pcv</span><span class="p">))</span> |
| |
| <span class="n">all_destination_file_pairs_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="p">(</span><span class="n">destination_files_kv_pc</span><span class="p">,</span> <span class="n">more_destination_files_kv_pc</span><span class="p">)</span> |
| <span class="o">|</span> <span class="s2">"DestinationFilesUnion"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">())</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span><span class="p">:</span> |
| <span class="c1"># Apply the user's trigger back before we start triggering load jobs</span> |
| <span class="n">all_destination_file_pairs_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">all_destination_file_pairs_pc</span> |
| <span class="o">|</span> <span class="s2">"ApplyUserTrigger"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span> |
| <span class="n">beam</span><span class="o">.</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">(),</span> |
| <span class="n">trigger</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">Repeatedly</span><span class="p">(</span> |
| <span class="n">trigger</span><span class="o">.</span><span class="n">AfterAll</span><span class="p">(</span> |
| <span class="n">trigger</span><span class="o">.</span><span class="n">AfterProcessingTime</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">),</span> |
| <span class="n">trigger</span><span class="o">.</span><span class="n">AfterCount</span><span class="p">(</span><span class="mi">1</span><span class="p">))),</span> |
| <span class="n">accumulation_mode</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">AccumulationMode</span><span class="o">.</span><span class="n">DISCARDING</span><span class="p">))</span> |
| <span class="k">return</span> <span class="n">all_destination_file_pairs_pc</span> |
| |
| <div class="viewcode-block" id="BigQueryBatchFileLoads.expand"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="n">p</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span> |
| |
| <span class="n">temp_location</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">temp_location</span> |
| |
| <span class="n">load_job_name_pcv</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span> |
| <span class="n">p</span> |
| <span class="o">|</span> <span class="s2">"ImpulseJobName"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="n">_generate_load_job_name</span><span class="p">()))</span> |
| |
| <span class="n">file_prefix_pcv</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span> |
| <span class="n">p</span> |
| <span class="o">|</span> <span class="s2">"CreateFilePrefixView"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="s1">''</span><span class="p">])</span> |
| <span class="o">|</span> <span class="s2">"GenerateFilePrefix"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span> |
| <span class="n">file_prefix_generator</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_validate</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="p">,</span> |
| <span class="n">temp_location</span><span class="p">)))</span> |
| |
| <span class="n">destination_data_kv_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">pcoll</span> |
| <span class="o">|</span> <span class="s2">"RewindowIntoGlobal"</span> <span class="o">>></span> <span class="bp">self</span><span class="o">.</span><span class="n">_window_fn</span><span class="p">()</span> |
| <span class="o">|</span> <span class="s2">"AppendDestination"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">bigquery_tools</span><span class="o">.</span><span class="n">AppendDestinationsFn</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">destination</span><span class="p">),</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">table_side_inputs</span><span class="p">))</span> |
| |
| <span class="n">all_destination_file_pairs_pc</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_write_files</span><span class="p">(</span><span class="n">destination_data_kv_pc</span><span class="p">,</span> |
| <span class="n">file_prefix_pcv</span><span class="p">)</span> |
| |
| <span class="n">grouped_files_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">all_destination_file_pairs_pc</span> |
| <span class="o">|</span> <span class="s2">"GroupFilesByTableDestinations"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">())</span> |
| |
| <span class="c1"># Load Jobs are triggered to temporary tables, and those are later copied to</span> |
| <span class="c1"># the actual appropriate destination query. This ensures atomicity when only</span> |
| <span class="c1"># some of the load jobs would fail but not other.</span> |
| <span class="c1"># If any of them fails, then copy jobs are not triggered.</span> |
| <span class="n">trigger_loads_outputs</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">grouped_files_pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">TriggerLoadJobs</span><span class="p">(</span> |
| <span class="n">schema</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span> |
| <span class="n">test_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">,</span> |
| <span class="n">temporary_tables</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">temp_tables</span><span class="p">,</span> |
| <span class="n">additional_bq_parameters</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">),</span> |
| <span class="n">load_job_name_pcv</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span><span class="p">)</span> |
| <span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span><span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">TEMP_TABLES</span><span class="p">,</span> <span class="n">main</span><span class="o">=</span><span class="s1">'main'</span><span class="p">)</span> |
| <span class="p">)</span> |
| |
| <span class="n">destination_job_ids_pc</span> <span class="o">=</span> <span class="n">trigger_loads_outputs</span><span class="p">[</span><span class="s1">'main'</span><span class="p">]</span> |
| <span class="n">temp_tables_pc</span> <span class="o">=</span> <span class="n">trigger_loads_outputs</span><span class="p">[</span><span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">TEMP_TABLES</span><span class="p">]</span> |
| |
| <span class="n">destination_copy_job_ids_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">p</span> |
| <span class="o">|</span> <span class="s2">"ImpulseMonitorLoadJobs"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span> |
| <span class="o">|</span> <span class="s2">"WaitForLoadJobs"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">WaitForBQJobs</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">),</span> |
| <span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">AsList</span><span class="p">(</span><span class="n">destination_job_ids_pc</span><span class="p">))</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">TriggerCopyJobs</span><span class="p">(</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span> |
| <span class="n">temporary_tables</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">temp_tables</span><span class="p">,</span> |
| <span class="n">test_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">),</span> <span class="n">load_job_name_pcv</span><span class="p">))</span> |
| |
| <span class="n">finished_copy_jobs_pc</span> <span class="o">=</span> <span class="p">(</span><span class="n">p</span> |
| <span class="o">|</span> <span class="s2">"ImpulseMonitorCopyJobs"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span> |
| <span class="o">|</span> <span class="s2">"WaitForCopyJobs"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">WaitForBQJobs</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">),</span> |
| <span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">AsList</span><span class="p">(</span><span class="n">destination_copy_job_ids_pc</span><span class="p">)</span> |
| <span class="p">))</span> |
| |
| <span class="n">_</span> <span class="o">=</span> <span class="p">(</span><span class="n">finished_copy_jobs_pc</span> |
| <span class="o">|</span> <span class="s2">"RemoveTempTables/PassTables"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span> |
| <span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="n">deleting_tables</span><span class="p">:</span> <span class="n">deleting_tables</span><span class="p">,</span> |
| <span class="n">pvalue</span><span class="o">.</span><span class="n">AsIter</span><span class="p">(</span><span class="n">temp_tables_pc</span><span class="p">))</span> |
| <span class="o">|</span> <span class="s2">"RemoveTempTables/AddUselessValue"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="kc">None</span><span class="p">))</span> |
| <span class="o">|</span> <span class="s2">"RemoveTempTables/DeduplicateTables"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span> |
| <span class="o">|</span> <span class="s2">"RemoveTempTables/GetTableNames"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">elm</span><span class="p">:</span> <span class="n">elm</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> |
| <span class="o">|</span> <span class="s2">"RemoveTempTables/Delete"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">DeleteTablesFn</span><span class="p">()))</span> |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">DESTINATION_JOBID_PAIRS</span><span class="p">:</span> <span class="n">destination_job_ids_pc</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">DESTINATION_FILE_PAIRS</span><span class="p">:</span> <span class="n">all_destination_file_pairs_pc</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">DESTINATION_COPY_JOBID_PAIRS</span><span class="p">:</span> <span class="n">destination_copy_job_ids_pc</span><span class="p">,</span> |
| <span class="p">}</span></div></div> |
| </pre></div> |
| |
| </div> |
| <div class="articleComments"> |
| |
| </div> |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| © Copyright . |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| |
| |
| <script type="text/javascript"> |
| var DOCUMENTATION_OPTIONS = { |
| URL_ROOT:'../../../../', |
| VERSION:'', |
| COLLAPSE_INDEX:false, |
| FILE_SUFFIX:'.html', |
| HAS_SOURCE: true, |
| SOURCELINK_SUFFIX: '.txt' |
| }; |
| </script> |
| <script type="text/javascript" src="../../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../../_static/doctools.js"></script> |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.StickyNav.enable(); |
| }); |
| </script> |
| |
| |
| </body> |
| </html> |