| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>apache_beam.io.gcp.bigquery_file_loads — Apache Beam 2.47.0 documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../../" src="../../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../../_static/language_data.js"></script> |
| <script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| |
| <script type="text/javascript" src="../../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../../_static/pygments.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../../search.html" /> |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../../index.html" class="icon icon-home"> Apache Beam |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 2.47.0 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.ml.html">apache_beam.ml package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.testing.html">apache_beam.testing package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.yaml.html">apache_beam.yaml package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../../index.html">Apache Beam</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../../index.html">Module code</a> »</li> |
| |
| <li>apache_beam.io.gcp.bigquery_file_loads</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.io.gcp.bigquery_file_loads</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="sd">"""</span> |
| <span class="sd">Functionality to perform file loads into BigQuery for Batch and Streaming</span> |
| <span class="sd">pipelines.</span> |
| |
| <span class="sd">This source is able to work around BigQuery load quotas and limitations. When</span> |
| <span class="sd">destinations are dynamic, or when data for a single job is too large, the data</span> |
| <span class="sd">will be split into multiple jobs.</span> |
| |
| <span class="sd">NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.</span> |
| <span class="sd">"""</span> |
| |
| <span class="c1"># pytype: skip-file</span> |
| |
| <span class="kn">import</span> <span class="nn">hashlib</span> |
| <span class="kn">import</span> <span class="nn">io</span> |
| <span class="kn">import</span> <span class="nn">logging</span> |
| <span class="kn">import</span> <span class="nn">random</span> |
| <span class="kn">import</span> <span class="nn">time</span> |
| <span class="kn">import</span> <span class="nn">uuid</span> |
| |
| <span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span> |
| <span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">pvalue</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io</span> <span class="kn">import</span> <span class="n">filesystems</span> <span class="k">as</span> <span class="n">fs</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp</span> <span class="kn">import</span> <span class="n">bigquery_tools</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp.bigquery_io_metadata</span> <span class="kn">import</span> <span class="n">create_bigquery_io_metadata</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.options</span> <span class="kn">import</span> <span class="n">value_provider</span> <span class="k">as</span> <span class="n">vp</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="kn">import</span> <span class="n">GoogleCloudOptions</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">trigger</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="kn">import</span> <span class="n">DisplayDataItem</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms.util</span> <span class="kn">import</span> <span class="n">GroupIntoBatches</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">GlobalWindows</span> |
| |
| <span class="c1"># Protect against environments where bigquery library is not available.</span> |
| <span class="c1"># pylint: disable=wrong-import-order, wrong-import-position</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">apitools.base.py.exceptions</span> <span class="kn">import</span> <span class="n">HttpError</span> |
| <span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span> |
| <span class="k">pass</span> |
| |
| <span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span> |
| |
| <span class="n">ONE_TERABYTE</span> <span class="o">=</span> <span class="p">(</span><span class="mi">1</span> <span class="o"><<</span> <span class="mi">40</span><span class="p">)</span> |
| |
| <span class="c1"># The maximum file size for imports is 5TB. We keep our files under that.</span> |
| <span class="n">_DEFAULT_MAX_FILE_SIZE</span> <span class="o">=</span> <span class="mi">4</span> <span class="o">*</span> <span class="n">ONE_TERABYTE</span> |
| |
| <span class="n">_DEFAULT_MAX_WRITERS_PER_BUNDLE</span> <span class="o">=</span> <span class="mi">20</span> |
| |
| <span class="c1"># The maximum size for a single load job is one terabyte</span> |
| <span class="n">_MAXIMUM_LOAD_SIZE</span> <span class="o">=</span> <span class="mi">15</span> <span class="o">*</span> <span class="n">ONE_TERABYTE</span> |
| |
| <span class="c1"># Big query only supports up to 10 thousand URIs for a single load job.</span> |
| <span class="n">_MAXIMUM_SOURCE_URIS</span> <span class="o">=</span> <span class="mi">10</span> <span class="o">*</span> <span class="mi">1000</span> |
| |
| <span class="c1"># If triggering_frequency is supplied, we will trigger the file write after</span> |
| <span class="c1"># this many records are written.</span> |
| <span class="n">_FILE_TRIGGERING_RECORD_COUNT</span> <span class="o">=</span> <span class="mi">500000</span> |
| |
| <span class="c1"># If using auto-sharding for unbounded data, we batch the records before</span> |
| <span class="c1"># triggering file write to avoid generating too many small files.</span> |
| <span class="n">_FILE_TRIGGERING_BATCHING_DURATION_SECS</span> <span class="o">=</span> <span class="mi">1</span> |
| |
| <span class="c1"># How many seconds we wait before polling a pending job</span> |
| <span class="n">_SLEEP_DURATION_BETWEEN_POLLS</span> <span class="o">=</span> <span class="mi">10</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_generate_job_name</span><span class="p">(</span><span class="n">job_name</span><span class="p">,</span> <span class="n">job_type</span><span class="p">,</span> <span class="n">step_name</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">generate_bq_job_name</span><span class="p">(</span> |
| <span class="n">job_name</span><span class="o">=</span><span class="n">job_name</span><span class="p">,</span> |
| <span class="n">step_id</span><span class="o">=</span><span class="n">step_name</span><span class="p">,</span> |
| <span class="n">job_type</span><span class="o">=</span><span class="n">job_type</span><span class="p">,</span> |
| <span class="n">random</span><span class="o">=</span><span class="n">random</span><span class="o">.</span><span class="n">randint</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="mi">1000</span><span class="p">))</span> |
| |
| |
| <div class="viewcode-block" id="file_prefix_generator"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.file_prefix_generator">[docs]</a><span class="k">def</span> <span class="nf">file_prefix_generator</span><span class="p">(</span> |
| <span class="n">with_validation</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">pipeline_gcs_location</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">temp_location</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">_generate_file_prefix</span><span class="p">(</span><span class="n">unused_elm</span><span class="p">):</span> |
| <span class="c1"># If a gcs location is provided to the pipeline, then we shall use that.</span> |
| <span class="c1"># Otherwise, we shall use the temp_location from pipeline options.</span> |
| <span class="n">gcs_base</span> <span class="o">=</span> <span class="n">pipeline_gcs_location</span><span class="o">.</span><span class="n">get</span><span class="p">()</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">gcs_base</span><span class="p">:</span> |
| <span class="n">gcs_base</span> <span class="o">=</span> <span class="n">temp_location</span> |
| |
| <span class="c1"># This will fail at pipeline execution time, but will fail early, as this</span> |
| <span class="c1"># step doesn't have any dependencies (and thus will be one of the first</span> |
| <span class="c1"># stages to be run).</span> |
| <span class="k">if</span> <span class="n">with_validation</span> <span class="ow">and</span> <span class="p">(</span><span class="ow">not</span> <span class="n">gcs_base</span> <span class="ow">or</span> <span class="ow">not</span> <span class="n">gcs_base</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">'gs://'</span><span class="p">)):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'Invalid GCS location: </span><span class="si">%r</span><span class="s1">.</span><span class="se">\n</span><span class="s1">'</span> |
| <span class="s1">'Writing to BigQuery with FILE_LOADS method requires a'</span> |
| <span class="s1">' GCS location to be provided to write files to be loaded'</span> |
| <span class="s1">' into BigQuery. Please provide a GCS bucket through'</span> |
| <span class="s1">' custom_gcs_temp_location in the constructor of WriteToBigQuery'</span> |
| <span class="s1">' or the fallback option --temp_location, or pass'</span> |
| <span class="s1">' method="STREAMING_INSERTS" to WriteToBigQuery.'</span> <span class="o">%</span> <span class="n">gcs_base</span><span class="p">)</span> |
| |
| <span class="n">prefix_uuid</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">()</span> |
| <span class="k">return</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">gcs_base</span><span class="p">,</span> <span class="s1">'bq_load'</span><span class="p">,</span> <span class="n">prefix_uuid</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">_generate_file_prefix</span></div> |
| |
| |
| <span class="k">def</span> <span class="nf">_make_new_file_writer</span><span class="p">(</span> |
| <span class="n">file_prefix</span><span class="p">,</span> |
| <span class="n">destination</span><span class="p">,</span> |
| <span class="n">file_format</span><span class="p">,</span> |
| <span class="n">schema</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">schema_side_inputs</span><span class="o">=</span><span class="nb">tuple</span><span class="p">()):</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">get_hashable_destination</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> |
| |
| <span class="c1"># Windows does not allow : on filenames. Replacing with underscore.</span> |
| <span class="c1"># Other disallowed characters are:</span> |
| <span class="c1"># https://docs.microsoft.com/en-us/windows/desktop/fileio/naming-a-file</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">destination</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">':'</span><span class="p">,</span> <span class="s1">'.'</span><span class="p">)</span> |
| |
| <span class="n">directory</span> <span class="o">=</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">exists</span><span class="p">(</span><span class="n">directory</span><span class="p">):</span> |
| <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">mkdirs</span><span class="p">(</span><span class="n">directory</span><span class="p">)</span> |
| |
| <span class="n">file_name</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span> |
| <span class="n">file_path</span> <span class="o">=</span> <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">file_prefix</span><span class="p">,</span> <span class="n">destination</span><span class="p">,</span> <span class="n">file_name</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">file_format</span> <span class="o">==</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">FileFormat</span><span class="o">.</span><span class="n">AVRO</span><span class="p">:</span> |
| <span class="k">if</span> <span class="nb">callable</span><span class="p">(</span><span class="n">schema</span><span class="p">):</span> |
| <span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span><span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">schema</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span> |
| <span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span><span class="o">.</span><span class="n">get</span><span class="p">()</span> |
| |
| <span class="n">writer</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">AvroRowWriter</span><span class="p">(</span> |
| <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">create</span><span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="s2">"application/avro"</span><span class="p">),</span> <span class="n">schema</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">file_format</span> <span class="o">==</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">FileFormat</span><span class="o">.</span><span class="n">JSON</span><span class="p">:</span> |
| <span class="n">writer</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">JsonRowWriter</span><span class="p">(</span> |
| <span class="n">fs</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">create</span><span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="s2">"application/text"</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">((</span> |
| <span class="s1">'Only AVRO and JSON are supported as intermediate formats for '</span> |
| <span class="s1">'BigQuery WriteRecordsToFile, got: </span><span class="si">{}</span><span class="s1">.'</span><span class="p">)</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">file_format</span><span class="p">))</span> |
| |
| <span class="k">return</span> <span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_bq_uuid</span><span class="p">(</span><span class="n">seed</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">seed</span><span class="p">:</span> |
| <span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s2">"-"</span><span class="p">,</span> <span class="s2">""</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="n">hashlib</span><span class="o">.</span><span class="n">md5</span><span class="p">(</span><span class="n">seed</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf8'</span><span class="p">))</span><span class="o">.</span><span class="n">hexdigest</span><span class="p">())</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_ShardDestinations</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Adds a shard number to the key of the KV element.</span> |
| |
| <span class="sd"> Experimental; no backwards compatibility guarantees."""</span> |
| <span class="n">DEFAULT_SHARDING_FACTOR</span> <span class="o">=</span> <span class="mi">10</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sharding_factor</span><span class="o">=</span><span class="n">DEFAULT_SHARDING_FACTOR</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sharding_factor</span> <span class="o">=</span> <span class="n">sharding_factor</span> |
| |
| <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_shard_count</span> <span class="o">=</span> <span class="n">random</span><span class="o">.</span><span class="n">randrange</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sharding_factor</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="n">row</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> |
| |
| <span class="n">sharded_destination</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">destination</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_shard_count</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">sharding_factor</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_shard_count</span> <span class="o">+=</span> <span class="mi">1</span> |
| <span class="k">yield</span> <span class="p">(</span><span class="n">sharded_destination</span><span class="p">,</span> <span class="n">row</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="WriteRecordsToFile"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile">[docs]</a><span class="k">class</span> <span class="nc">WriteRecordsToFile</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Write input records to files before triggering a load job.</span> |
| |
| <span class="sd"> This transform keeps up to ``max_files_per_bundle`` files open to write to. It</span> |
| <span class="sd"> receives (destination, record) tuples, and it writes the records to different</span> |
| <span class="sd"> files for each destination.</span> |
| |
| <span class="sd"> If there are more than ``max_files_per_bundle`` destinations that we need to</span> |
| <span class="sd"> write to, then those records are grouped by their destination, and later</span> |
| <span class="sd"> written to files by ``WriteGroupedRecordsToFile``.</span> |
| |
| <span class="sd"> It outputs two PCollections.</span> |
| <span class="sd"> """</span> |
| |
| <span class="n">UNWRITTEN_RECORD_TAG</span> <span class="o">=</span> <span class="s1">'UnwrittenRecords'</span> |
| <span class="n">WRITTEN_FILE_TAG</span> <span class="o">=</span> <span class="s1">'WrittenFiles'</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">schema</span><span class="p">,</span> |
| <span class="n">max_files_per_bundle</span><span class="o">=</span><span class="n">_DEFAULT_MAX_WRITERS_PER_BUNDLE</span><span class="p">,</span> |
| <span class="n">max_file_size</span><span class="o">=</span><span class="n">_DEFAULT_MAX_FILE_SIZE</span><span class="p">,</span> |
| <span class="n">file_format</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Initialize a :class:`WriteRecordsToFile`.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> max_files_per_bundle (int): The maximum number of files that can be kept</span> |
| <span class="sd"> open during execution of this step in a worker. This is to avoid over-</span> |
| <span class="sd"> whelming the worker memory.</span> |
| <span class="sd"> max_file_size (int): The maximum size in bytes for a file to be used in</span> |
| <span class="sd"> an export job.</span> |
| |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span> <span class="o">=</span> <span class="n">max_files_per_bundle</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span> <span class="o">=</span> <span class="n">max_file_size</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">file_format</span> <span class="o">=</span> <span class="n">file_format</span> <span class="ow">or</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">FileFormat</span><span class="o">.</span><span class="n">JSON</span> |
| |
| <div class="viewcode-block" id="WriteRecordsToFile.display_data"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="s1">'max_files_per_bundle'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span><span class="p">,</span> |
| <span class="s1">'max_file_size'</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">),</span> |
| <span class="s1">'file_format'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_format</span><span class="p">,</span> |
| <span class="p">}</span></div> |
| |
| <div class="viewcode-block" id="WriteRecordsToFile.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span> <span class="o">=</span> <span class="p">{}</span></div> |
| |
| <div class="viewcode-block" id="WriteRecordsToFile.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">file_prefix</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Take a tuple with (destination, row) and write to file or spill out.</span> |
| |
| <span class="sd"> Destination may be a ``TableReference`` or a string, and row is a</span> |
| <span class="sd"> Python dictionary for a row to be inserted to BigQuery."""</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">get_hashable_destination</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> |
| <span class="n">row</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> |
| |
| <span class="k">if</span> <span class="n">destination</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">:</span> |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">)</span> <span class="o"><</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">[</span><span class="n">destination</span><span class="p">]</span> <span class="o">=</span> <span class="n">_make_new_file_writer</span><span class="p">(</span> |
| <span class="n">file_prefix</span><span class="p">,</span> |
| <span class="n">destination</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">file_format</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> |
| <span class="n">schema_side_inputs</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span> |
| <span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">UNWRITTEN_RECORD_TAG</span><span class="p">,</span> <span class="n">element</span><span class="p">)</span> |
| <span class="k">return</span> |
| |
| <span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span><span class="p">)</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="p">[</span><span class="n">destination</span><span class="p">]</span> |
| |
| <span class="c1"># TODO(pabloem): Is it possible for this to throw exception?</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">row</span><span class="p">)</span> |
| |
| <span class="n">file_size</span> <span class="o">=</span> <span class="n">writer</span><span class="o">.</span><span class="n">tell</span><span class="p">()</span> |
| <span class="k">if</span> <span class="n">file_size</span> <span class="o">></span> <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">:</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> |
| <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span> |
| <span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">,</span> |
| <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">)))</span></div> |
| |
| <div class="viewcode-block" id="WriteRecordsToFile.finish_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile.finish_bundle">[docs]</a> <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">destination</span><span class="p">,</span> <span class="n">file_path_writer</span> <span class="ow">in</span> \ |
| <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span><span class="o">.</span><span class="n">items</span><span class="p">():</span> |
| <span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span><span class="p">)</span> <span class="o">=</span> <span class="n">file_path_writer</span> |
| <span class="n">file_size</span> <span class="o">=</span> <span class="n">writer</span><span class="o">.</span><span class="n">tell</span><span class="p">()</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span> |
| <span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">,</span> |
| <span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">((</span><span class="n">destination</span><span class="p">,</span> <span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">))))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_destination_to_file_writer</span> <span class="o">=</span> <span class="p">{}</span></div></div> |
| |
| |
| <div class="viewcode-block" id="WriteGroupedRecordsToFile"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteGroupedRecordsToFile">[docs]</a><span class="k">class</span> <span class="nc">WriteGroupedRecordsToFile</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Receives collection of dest-iterable(records), writes it to files.</span> |
| |
| <span class="sd"> This is different from ``WriteRecordsToFile`` because it receives records</span> |
| <span class="sd"> grouped by destination. This means that it's not necessary to keep multiple</span> |
| <span class="sd"> file descriptors open, because we know for sure when records for a single</span> |
| <span class="sd"> destination have been written out.</span> |
| |
| <span class="sd"> Experimental; no backwards compatibility guarantees.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="n">schema</span><span class="p">,</span> <span class="n">max_file_size</span><span class="o">=</span><span class="n">_DEFAULT_MAX_FILE_SIZE</span><span class="p">,</span> <span class="n">file_format</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span> <span class="o">=</span> <span class="n">max_file_size</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">file_format</span> <span class="o">=</span> <span class="n">file_format</span> <span class="ow">or</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">FileFormat</span><span class="o">.</span><span class="n">JSON</span> |
| |
| <div class="viewcode-block" id="WriteGroupedRecordsToFile.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.WriteGroupedRecordsToFile.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">file_prefix</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">):</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">get_hashable_destination</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> |
| <span class="n">rows</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> |
| |
| <span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span> |
| |
| <span class="k">for</span> <span class="n">row</span> <span class="ow">in</span> <span class="n">rows</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">writer</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span><span class="p">)</span> <span class="o">=</span> <span class="n">_make_new_file_writer</span><span class="p">(</span> |
| <span class="n">file_prefix</span><span class="p">,</span> |
| <span class="n">destination</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">file_format</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> |
| <span class="n">schema_side_inputs</span><span class="p">)</span> |
| |
| <span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">row</span><span class="p">)</span> |
| |
| <span class="n">file_size</span> <span class="o">=</span> <span class="n">writer</span><span class="o">.</span><span class="n">tell</span><span class="p">()</span> |
| <span class="k">if</span> <span class="n">file_size</span> <span class="o">></span> <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">:</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="k">yield</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">))</span> |
| <span class="n">file_path</span><span class="p">,</span> <span class="n">writer</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span> |
| <span class="k">if</span> <span class="n">writer</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="k">yield</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">))</span></div></div> |
| |
| |
| <div class="viewcode-block" id="UpdateDestinationSchema"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema">[docs]</a><span class="k">class</span> <span class="nc">UpdateDestinationSchema</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Update destination schema based on data that is about to be copied into it.</span> |
| |
| <span class="sd"> Unlike load and query jobs, BigQuery copy jobs do not support schema field</span> |
| <span class="sd"> addition or relaxation on the destination table. This DoFn fills that gap by</span> |
| <span class="sd"> updating the destination table schemas to be compatible with the data coming</span> |
| <span class="sd"> from the source table so that schema field modification options are respected</span> |
| <span class="sd"> regardless of whether data is loaded directly to the destination table or</span> |
| <span class="sd"> loaded into temporary tables before being copied into the destination.</span> |
| |
| <span class="sd"> This transform takes as input a (destination, job_reference) pair where the</span> |
| <span class="sd"> job_reference refers to a completed load job into a temporary table.</span> |
| |
| <span class="sd"> This transform emits (destination, job_reference) pairs where the</span> |
| <span class="sd"> job_reference refers to a submitted load job for performing the schema</span> |
| <span class="sd"> modification in JSON format. Note that the input and output job references</span> |
| <span class="sd"> are not the same.</span> |
| |
| <span class="sd"> Experimental; no backwards compatibility guarantees.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">project</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">additional_bq_parameters</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">step_name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">load_job_project_id</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="n">project</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_test_client</span> <span class="o">=</span> <span class="n">test_client</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span> <span class="o">=</span> <span class="n">additional_bq_parameters</span> <span class="ow">or</span> <span class="p">{}</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span> <span class="o">=</span> <span class="n">step_name</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_load_job_project_id</span> <span class="o">=</span> <span class="n">load_job_project_id</span> |
| |
| <div class="viewcode-block" id="UpdateDestinationSchema.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_test_client</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_bq_io_metadata</span> <span class="o">=</span> <span class="n">create_bigquery_io_metadata</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span> <span class="o">=</span> <span class="p">[]</span></div> |
| |
| <div class="viewcode-block" id="UpdateDestinationSchema.display_data"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="s1">'write_disposition'</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_write_disposition</span><span class="p">),</span> |
| <span class="s1">'additional_bq_params'</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span><span class="p">),</span> |
| <span class="p">}</span></div> |
| |
| <div class="viewcode-block" id="UpdateDestinationSchema.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">schema_mod_job_name_prefix</span><span class="p">):</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="n">temp_table_load_job_reference</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> |
| |
| <span class="k">if</span> <span class="nb">callable</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span><span class="p">):</span> |
| <span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span> |
| <span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span><span class="o">.</span><span class="n">get</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_additional_bq_parameters</span> |
| |
| <span class="c1"># When writing to normal tables WRITE_TRUNCATE will overwrite the schema but</span> |
| <span class="c1"># when writing to a partition, care needs to be taken to update the schema</span> |
| <span class="c1"># even on WRITE_TRUNCATE.</span> |
| <span class="k">if</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_write_disposition</span> <span class="ow">not</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">'WRITE_TRUNCATE'</span><span class="p">,</span> <span class="s1">'WRITE_APPEND'</span><span class="p">)</span> <span class="ow">or</span> |
| <span class="ow">not</span> <span class="n">additional_parameters</span> <span class="ow">or</span> |
| <span class="ow">not</span> <span class="n">additional_parameters</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"schemaUpdateOptions"</span><span class="p">)):</span> |
| <span class="c1"># No need to modify schema of destination table</span> |
| <span class="k">return</span> |
| |
| <span class="n">table_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span> |
| <span class="s1">'project'</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">project</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="c1"># Check if destination table exists</span> |
| <span class="n">destination_table</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">get_table</span><span class="p">(</span> |
| <span class="n">project_id</span><span class="o">=</span><span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span> |
| <span class="n">dataset_id</span><span class="o">=</span><span class="n">table_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span> |
| <span class="n">table_id</span><span class="o">=</span><span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)</span> |
| <span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">exn</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">exn</span><span class="o">.</span><span class="n">status_code</span> <span class="o">==</span> <span class="mi">404</span><span class="p">:</span> |
| <span class="c1"># Destination table does not exist, so no need to modify its schema</span> |
| <span class="c1"># ahead of the copy jobs.</span> |
| <span class="k">return</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> |
| |
| <span class="n">temp_table_load_job</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">get_job</span><span class="p">(</span> |
| <span class="n">project</span><span class="o">=</span><span class="n">temp_table_load_job_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span> |
| <span class="n">job_id</span><span class="o">=</span><span class="n">temp_table_load_job_reference</span><span class="o">.</span><span class="n">jobId</span><span class="p">,</span> |
| <span class="n">location</span><span class="o">=</span><span class="n">temp_table_load_job_reference</span><span class="o">.</span><span class="n">location</span><span class="p">)</span> |
| <span class="n">temp_table_schema</span> <span class="o">=</span> <span class="n">temp_table_load_job</span><span class="o">.</span><span class="n">configuration</span><span class="o">.</span><span class="n">load</span><span class="o">.</span><span class="n">schema</span> |
| |
| <span class="k">if</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">check_schema_equal</span><span class="p">(</span><span class="n">temp_table_schema</span><span class="p">,</span> |
| <span class="n">destination_table</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> |
| <span class="n">ignore_descriptions</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">ignore_field_order</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span> |
| <span class="c1"># Destination table schema is already the same as the temp table schema,</span> |
| <span class="c1"># so no need to run a job to update the destination table schema.</span> |
| <span class="k">return</span> |
| |
| <span class="n">destination_hash</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">(</span> |
| <span class="s1">'</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">.</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">))</span> |
| <span class="n">uid</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">()</span> |
| <span class="n">job_name</span> <span class="o">=</span> <span class="s1">'</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span><span class="n">schema_mod_job_name_prefix</span><span class="p">,</span> <span class="n">destination_hash</span><span class="p">,</span> <span class="n">uid</span><span class="p">)</span> |
| |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s1">'Triggering schema modification job </span><span class="si">%s</span><span class="s1"> on </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">job_name</span><span class="p">,</span> |
| <span class="n">table_reference</span><span class="p">)</span> |
| <span class="c1"># Trigger potential schema modification by loading zero rows into the</span> |
| <span class="c1"># destination table with the temporary table schema.</span> |
| <span class="n">schema_update_job_reference</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">perform_load_job</span><span class="p">(</span> |
| <span class="n">destination</span><span class="o">=</span><span class="n">table_reference</span><span class="p">,</span> |
| <span class="n">source_stream</span><span class="o">=</span><span class="n">io</span><span class="o">.</span><span class="n">BytesIO</span><span class="p">(),</span> <span class="c1"># file with zero rows</span> |
| <span class="n">job_id</span><span class="o">=</span><span class="n">job_name</span><span class="p">,</span> |
| <span class="n">schema</span><span class="o">=</span><span class="n">temp_table_schema</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="s1">'WRITE_APPEND'</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="s1">'CREATE_NEVER'</span><span class="p">,</span> |
| <span class="n">additional_load_parameters</span><span class="o">=</span><span class="n">additional_parameters</span><span class="p">,</span> |
| <span class="n">job_labels</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_bq_io_metadata</span><span class="o">.</span><span class="n">add_additional_bq_job_labels</span><span class="p">(),</span> |
| <span class="c1"># JSON format is hardcoded because zero rows load(unlike AVRO) and</span> |
| <span class="c1"># a nested schema(unlike CSV, which a default one) is permitted.</span> |
| <span class="n">source_format</span><span class="o">=</span><span class="s2">"NEWLINE_DELIMITED_JSON"</span><span class="p">,</span> |
| <span class="n">load_job_project_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_load_job_project_id</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="o">.</span><span class="n">append</span><span class="p">(</span> |
| <span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span> |
| <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">schema_update_job_reference</span><span class="p">)))</span></div> |
| |
| <div class="viewcode-block" id="UpdateDestinationSchema.finish_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema.finish_bundle">[docs]</a> <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># Unlike the other steps, schema update is not always necessary.</span> |
| <span class="c1"># In that case, return a None value to avoid blocking in streaming context.</span> |
| <span class="c1"># Otherwise, the streaming pipeline would get stuck waiting for the</span> |
| <span class="c1"># TriggerCopyJobs side-input.</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="p">:</span> |
| <span class="k">return</span> <span class="p">[</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="kc">None</span><span class="p">)]</span> |
| |
| <span class="k">for</span> <span class="n">windowed_value</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="p">:</span> |
| <span class="n">job_ref</span> <span class="o">=</span> <span class="n">windowed_value</span><span class="o">.</span><span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">wait_for_bq_job</span><span class="p">(</span> |
| <span class="n">job_ref</span><span class="p">,</span> <span class="n">sleep_duration_sec</span><span class="o">=</span><span class="n">_SLEEP_DURATION_BETWEEN_POLLS</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span></div></div> |
| |
| |
| <div class="viewcode-block" id="TriggerCopyJobs"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs">[docs]</a><span class="k">class</span> <span class="nc">TriggerCopyJobs</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Launches jobs to copy from temporary tables into the main target table.</span> |
| |
| <span class="sd"> When a job needs to write to multiple destination tables, or when a single</span> |
| <span class="sd"> destination table needs to have multiple load jobs to write to it, files are</span> |
| <span class="sd"> loaded into temporary tables, and those tables are later copied to the</span> |
| <span class="sd"> destination tables.</span> |
| |
| <span class="sd"> This transform emits (destination, job_reference) pairs.</span> |
| |
| <span class="sd"> TODO(BEAM-7822): In file loads method of writing to BigQuery,</span> |
| <span class="sd"> copying from temp_tables to destination_table is not atomic.</span> |
| <span class="sd"> See: https://issues.apache.org/jira/browse/BEAM-7822</span> |
| <span class="sd"> """</span> |
| |
| <span class="n">TRIGGER_DELETE_TEMP_TABLES</span> <span class="o">=</span> <span class="s1">'TriggerDeleteTempTables'</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">project</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">step_name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">load_job_project_id</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="n">project</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="n">create_disposition</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_observed_tables</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span> <span class="o">=</span> <span class="n">step_name</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span> <span class="o">=</span> <span class="n">load_job_project_id</span> |
| |
| <div class="viewcode-block" id="TriggerCopyJobs.display_data"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="s1">'launchesBigQueryJobs'</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span> |
| <span class="kc">True</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s2">"This Dataflow job launches bigquery jobs."</span><span class="p">)</span> |
| <span class="p">}</span></div> |
| |
| <div class="viewcode-block" id="TriggerCopyJobs.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_observed_tables</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span> <span class="o">=</span> <span class="n">create_bigquery_io_metadata</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span> <span class="o">=</span> <span class="p">[]</span></div> |
| |
| <div class="viewcode-block" id="TriggerCopyJobs.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="n">element_list</span><span class="p">,</span> <span class="n">job_name_prefix</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">unused_schema_mod_jobs</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element_list</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">):</span> |
| <span class="c1"># Allow this for streaming update compatibility while fixing BEAM-24535.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">process_one</span><span class="p">(</span><span class="n">element_list</span><span class="p">,</span> <span class="n">job_name_prefix</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">element</span> <span class="ow">in</span> <span class="n">element_list</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">process_one</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">job_name_prefix</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="TriggerCopyJobs.process_one"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.process_one">[docs]</a> <span class="k">def</span> <span class="nf">process_one</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">job_name_prefix</span><span class="p">):</span> |
| <span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span> <span class="o">=</span> <span class="n">element</span> |
| |
| <span class="n">copy_to_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span> |
| <span class="s1">'project'</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">project</span> |
| |
| <span class="n">copy_from_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> |
| <span class="n">copy_from_reference</span><span class="o">.</span><span class="n">tableId</span> <span class="o">=</span> <span class="n">job_reference</span><span class="o">.</span><span class="n">jobId</span> |
| <span class="k">if</span> <span class="n">copy_from_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">copy_from_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span> |
| <span class="s1">'project'</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">project</span> |
| |
| <span class="n">copy_job_name</span> <span class="o">=</span> <span class="s1">'</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span> |
| <span class="n">job_name_prefix</span><span class="p">,</span> |
| <span class="n">_bq_uuid</span><span class="p">(</span> |
| <span class="s1">'</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">.</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span> |
| <span class="n">copy_from_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span> |
| <span class="n">copy_from_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span> |
| <span class="n">copy_from_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)))</span> |
| |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s2">"Triggering copy job from </span><span class="si">%s</span><span class="s2"> to </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> |
| <span class="n">copy_from_reference</span><span class="p">,</span> |
| <span class="n">copy_to_reference</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">copy_to_reference</span><span class="o">.</span><span class="n">tableId</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_observed_tables</span><span class="p">:</span> |
| <span class="c1"># When the write_disposition for a job is WRITE_TRUNCATE,</span> |
| <span class="c1"># multiple copy jobs to the same destination can stump on</span> |
| <span class="c1"># each other, truncate data, and write to the BQ table over and</span> |
| <span class="c1"># over.</span> |
| <span class="c1"># Thus, the first copy job runs with the user's write_disposition,</span> |
| <span class="c1"># but afterwards, all jobs must always WRITE_APPEND to the table.</span> |
| <span class="c1"># If they do not, subsequent copy jobs will clear out data appended</span> |
| <span class="c1"># by previous jobs.</span> |
| <span class="n">write_disposition</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> |
| <span class="n">wait_for_job</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_observed_tables</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">copy_to_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">wait_for_job</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="n">write_disposition</span> <span class="o">=</span> <span class="s1">'WRITE_APPEND'</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span> <span class="o">=</span> <span class="n">create_bigquery_io_metadata</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span><span class="p">)</span> |
| |
| <span class="n">project_id</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">copy_to_reference</span><span class="o">.</span><span class="n">projectId</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span> <span class="ow">is</span> <span class="kc">None</span> <span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span><span class="p">)</span> |
| <span class="n">job_reference</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">_insert_copy_job</span><span class="p">(</span> |
| <span class="n">project_id</span><span class="p">,</span> |
| <span class="n">copy_job_name</span><span class="p">,</span> |
| <span class="n">copy_from_reference</span><span class="p">,</span> |
| <span class="n">copy_to_reference</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="n">write_disposition</span><span class="p">,</span> |
| <span class="n">job_labels</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span><span class="o">.</span><span class="n">add_additional_bq_job_labels</span><span class="p">())</span> |
| |
| <span class="k">if</span> <span class="n">wait_for_job</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">wait_for_bq_job</span><span class="p">(</span><span class="n">job_reference</span><span class="p">,</span> <span class="n">sleep_duration_sec</span><span class="o">=</span><span class="mi">10</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="o">.</span><span class="n">append</span><span class="p">(</span> |
| <span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">((</span><span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span><span class="p">)))</span></div> |
| |
| <div class="viewcode-block" id="TriggerCopyJobs.finish_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.finish_bundle">[docs]</a> <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">windowed_value</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="p">:</span> |
| <span class="n">job_ref</span> <span class="o">=</span> <span class="n">windowed_value</span><span class="o">.</span><span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">wait_for_bq_job</span><span class="p">(</span> |
| <span class="n">job_ref</span><span class="p">,</span> <span class="n">sleep_duration_sec</span><span class="o">=</span><span class="n">_SLEEP_DURATION_BETWEEN_POLLS</span><span class="p">)</span> |
| <span class="k">yield</span> <span class="n">windowed_value</span> |
| |
| <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span> |
| <span class="n">TriggerCopyJobs</span><span class="o">.</span><span class="n">TRIGGER_DELETE_TEMP_TABLES</span><span class="p">,</span> |
| <span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="kc">None</span><span class="p">))</span></div></div> |
| |
| |
| <div class="viewcode-block" id="TriggerLoadJobs"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs">[docs]</a><span class="k">class</span> <span class="nc">TriggerLoadJobs</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Triggers the import jobs to BQ.</span> |
| |
| <span class="sd"> Experimental; no backwards compatibility guarantees.</span> |
| <span class="sd"> """</span> |
| |
| <span class="n">TEMP_TABLES</span> <span class="o">=</span> <span class="s1">'TemporaryTables'</span> |
| <span class="n">ONGOING_JOBS</span> <span class="o">=</span> <span class="s1">'OngoingJobs'</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">schema</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">project</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">temporary_tables</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">additional_bq_parameters</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">source_format</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">step_name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">load_job_project_id</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="n">project</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span> <span class="o">=</span> <span class="n">temporary_tables</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span> <span class="o">=</span> <span class="n">additional_bq_parameters</span> <span class="ow">or</span> <span class="p">{}</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">source_format</span> <span class="o">=</span> <span class="n">source_format</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span> <span class="o">=</span> <span class="n">step_name</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span> <span class="o">=</span> <span class="n">load_job_project_id</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span><span class="p">:</span> |
| <span class="c1"># If we are loading into temporary tables, we rely on the default create</span> |
| <span class="c1"># and write dispositions, which mean that a new table will be created.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="n">create_disposition</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span> |
| |
| <div class="viewcode-block" id="TriggerLoadJobs.display_data"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="s1">'create_disposition'</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">),</span> |
| <span class="s1">'write_disposition'</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">),</span> |
| <span class="s1">'additional_bq_params'</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">),</span> |
| <span class="s1">'schema'</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">),</span> |
| <span class="s1">'launchesBigQueryJobs'</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span> |
| <span class="kc">True</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s2">"This Dataflow job launches bigquery jobs."</span><span class="p">),</span> |
| <span class="s1">'source_format'</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source_format</span><span class="p">),</span> |
| <span class="p">}</span> |
| <span class="k">return</span> <span class="n">result</span></div> |
| |
| <div class="viewcode-block" id="TriggerLoadJobs.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span> <span class="o">=</span> <span class="n">create_bigquery_io_metadata</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span> <span class="o">=</span> <span class="p">[]</span></div> |
| |
| <div class="viewcode-block" id="TriggerLoadJobs.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">load_job_name_prefix</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">):</span> |
| <span class="c1"># Each load job is assumed to have files respecting these constraints:</span> |
| <span class="c1"># 1. Total size of all files < 15 TB (Max size for load jobs)</span> |
| <span class="c1"># 2. Total no. of files in a single load job < 10,000</span> |
| <span class="c1"># This assumption means that there will always be a single load job</span> |
| <span class="c1"># triggered for each partition of files.</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="n">files</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> |
| |
| <span class="k">if</span> <span class="nb">callable</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">):</span> |
| <span class="n">schema</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="o">*</span><span class="n">schema_side_inputs</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span> |
| <span class="n">schema</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="o">.</span><span class="n">get</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">schema</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span> |
| |
| <span class="k">if</span> <span class="nb">callable</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">):</span> |
| <span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span> |
| <span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="o">.</span><span class="n">get</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">additional_parameters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span> |
| |
| <span class="n">table_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">RuntimeValueProvider</span><span class="o">.</span><span class="n">get_value</span><span class="p">(</span> |
| <span class="s1">'project'</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">project</span> |
| <span class="c1"># Load jobs for a single destination are always triggered from the same</span> |
| <span class="c1"># worker. This means that we can generate a deterministic numbered job id,</span> |
| <span class="c1"># and not need to worry.</span> |
| <span class="n">destination_hash</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">(</span> |
| <span class="s1">'</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">.</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">))</span> |
| <span class="n">uid</span> <span class="o">=</span> <span class="n">_bq_uuid</span><span class="p">()</span> |
| <span class="n">job_name</span> <span class="o">=</span> <span class="s1">'</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span><span class="n">load_job_name_prefix</span><span class="p">,</span> <span class="n">destination_hash</span><span class="p">,</span> <span class="n">uid</span><span class="p">)</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Load job has </span><span class="si">%s</span><span class="s1"> files. Job name is </span><span class="si">%s</span><span class="s1">.'</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">files</span><span class="p">),</span> <span class="n">job_name</span><span class="p">)</span> |
| |
| <span class="n">create_disposition</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">temporary_tables</span><span class="p">:</span> |
| <span class="c1"># If we are using temporary tables, then we must always create the</span> |
| <span class="c1"># temporary tables, so we replace the create_disposition.</span> |
| <span class="n">create_disposition</span> <span class="o">=</span> <span class="s1">'CREATE_IF_NEEDED'</span> |
| <span class="c1"># For temporary tables, we create a new table with the name with JobId.</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span> <span class="o">=</span> <span class="n">job_name</span> |
| <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span> |
| <span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">TEMP_TABLES</span><span class="p">,</span> |
| <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">get_hashable_destination</span><span class="p">(</span><span class="n">table_reference</span><span class="p">))</span> |
| |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s1">'Triggering job </span><span class="si">%s</span><span class="s1"> to load data to BigQuery table </span><span class="si">%s</span><span class="s1">.'</span> |
| <span class="s1">'Schema: </span><span class="si">%s</span><span class="s1">. Additional parameters: </span><span class="si">%s</span><span class="s1">. Source format: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">job_name</span><span class="p">,</span> |
| <span class="n">table_reference</span><span class="p">,</span> |
| <span class="n">schema</span><span class="p">,</span> |
| <span class="n">additional_parameters</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">source_format</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span> <span class="o">=</span> <span class="n">create_bigquery_io_metadata</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_step_name</span><span class="p">)</span> |
| <span class="n">job_reference</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">perform_load_job</span><span class="p">(</span> |
| <span class="n">destination</span><span class="o">=</span><span class="n">table_reference</span><span class="p">,</span> |
| <span class="n">source_uris</span><span class="o">=</span><span class="n">files</span><span class="p">,</span> |
| <span class="n">job_id</span><span class="o">=</span><span class="n">job_name</span><span class="p">,</span> |
| <span class="n">schema</span><span class="o">=</span><span class="n">schema</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="n">create_disposition</span><span class="p">,</span> |
| <span class="n">additional_load_parameters</span><span class="o">=</span><span class="n">additional_parameters</span><span class="p">,</span> |
| <span class="n">source_format</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source_format</span><span class="p">,</span> |
| <span class="n">job_labels</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">bq_io_metadata</span><span class="o">.</span><span class="n">add_additional_bq_job_labels</span><span class="p">(),</span> |
| <span class="n">load_job_project_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span><span class="p">)</span> |
| <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span> |
| <span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">ONGOING_JOBS</span><span class="p">,</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span><span class="p">))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="o">.</span><span class="n">append</span><span class="p">(</span> |
| <span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">((</span><span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span><span class="p">)))</span></div> |
| |
| <div class="viewcode-block" id="TriggerLoadJobs.finish_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs.finish_bundle">[docs]</a> <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">windowed_value</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span><span class="p">:</span> |
| <span class="n">job_ref</span> <span class="o">=</span> <span class="n">windowed_value</span><span class="o">.</span><span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">wait_for_bq_job</span><span class="p">(</span> |
| <span class="n">job_ref</span><span class="p">,</span> <span class="n">sleep_duration_sec</span><span class="o">=</span><span class="n">_SLEEP_DURATION_BETWEEN_POLLS</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_jobs</span></div></div> |
| |
| |
| <div class="viewcode-block" id="PartitionFiles"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.PartitionFiles">[docs]</a><span class="k">class</span> <span class="nc">PartitionFiles</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| |
| <span class="n">MULTIPLE_PARTITIONS_TAG</span> <span class="o">=</span> <span class="s1">'MULTIPLE_PARTITIONS'</span> |
| <span class="n">SINGLE_PARTITION_TAG</span> <span class="o">=</span> <span class="s1">'SINGLE_PARTITION'</span> |
| |
| <div class="viewcode-block" id="PartitionFiles.Partition"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.PartitionFiles.Partition">[docs]</a> <span class="k">class</span> <span class="nc">Partition</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">max_size</span><span class="p">,</span> <span class="n">max_files</span><span class="p">,</span> <span class="n">files</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">size</span><span class="o">=</span><span class="mi">0</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_size</span> <span class="o">=</span> <span class="n">max_size</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_files</span> <span class="o">=</span> <span class="n">max_files</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">files</span> <span class="o">=</span> <span class="n">files</span> <span class="k">if</span> <span class="n">files</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="k">else</span> <span class="p">[]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">size</span> <span class="o">=</span> <span class="n">size</span> |
| |
| <div class="viewcode-block" id="PartitionFiles.Partition.can_accept"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.PartitionFiles.Partition.can_accept">[docs]</a> <span class="k">def</span> <span class="nf">can_accept</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">file_size</span><span class="p">,</span> <span class="n">no_of_files</span><span class="o">=</span><span class="mi">1</span><span class="p">):</span> |
| <span class="k">if</span> <span class="p">(((</span><span class="bp">self</span><span class="o">.</span><span class="n">size</span> <span class="o">+</span> <span class="n">file_size</span><span class="p">)</span> <span class="o"><=</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_size</span><span class="p">)</span> <span class="ow">and</span> |
| <span class="p">((</span><span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">files</span><span class="p">)</span> <span class="o">+</span> <span class="n">no_of_files</span><span class="p">)</span> <span class="o"><=</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files</span><span class="p">)):</span> |
| <span class="k">return</span> <span class="kc">True</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">False</span></div> |
| |
| <div class="viewcode-block" id="PartitionFiles.Partition.add"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.PartitionFiles.Partition.add">[docs]</a> <span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">files</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">file_path</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">size</span> <span class="o">+=</span> <span class="n">file_size</span></div></div> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">max_partition_size</span><span class="p">,</span> <span class="n">max_files_per_partition</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_partition_size</span> <span class="o">=</span> <span class="n">max_partition_size</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_partition</span> <span class="o">=</span> <span class="n">max_files_per_partition</span> |
| |
| <div class="viewcode-block" id="PartitionFiles.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.PartitionFiles.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="n">files</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> |
| <span class="n">partitions</span> <span class="o">=</span> <span class="p">[]</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">files</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span> |
| <span class="s1">'Ignoring a BigQuery batch load partition to </span><span class="si">%s</span><span class="s1"> '</span> |
| <span class="s1">'that contains no source URIs.'</span><span class="p">,</span> |
| <span class="n">destination</span><span class="p">)</span> |
| <span class="k">return</span> |
| |
| <span class="n">latest_partition</span> <span class="o">=</span> <span class="n">PartitionFiles</span><span class="o">.</span><span class="n">Partition</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_partition_size</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_partition</span><span class="p">)</span> |
| |
| <span class="k">for</span> <span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span> <span class="ow">in</span> <span class="n">files</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">latest_partition</span><span class="o">.</span><span class="n">can_accept</span><span class="p">(</span><span class="n">file_size</span><span class="p">):</span> |
| <span class="n">latest_partition</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">partitions</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">latest_partition</span><span class="o">.</span><span class="n">files</span><span class="p">)</span> |
| <span class="n">latest_partition</span> <span class="o">=</span> <span class="n">PartitionFiles</span><span class="o">.</span><span class="n">Partition</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_partition_size</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_partition</span><span class="p">)</span> |
| <span class="n">latest_partition</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">file_path</span><span class="p">,</span> <span class="n">file_size</span><span class="p">)</span> |
| <span class="n">partitions</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">latest_partition</span><span class="o">.</span><span class="n">files</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">partitions</span><span class="p">)</span> <span class="o">></span> <span class="mi">1</span><span class="p">:</span> |
| <span class="n">output_tag</span> <span class="o">=</span> <span class="n">PartitionFiles</span><span class="o">.</span><span class="n">MULTIPLE_PARTITIONS_TAG</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">output_tag</span> <span class="o">=</span> <span class="n">PartitionFiles</span><span class="o">.</span><span class="n">SINGLE_PARTITION_TAG</span> |
| |
| <span class="k">for</span> <span class="n">partition</span> <span class="ow">in</span> <span class="n">partitions</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span><span class="n">output_tag</span><span class="p">,</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">partition</span><span class="p">))</span></div></div> |
| |
| |
| <div class="viewcode-block" id="DeleteTablesFn"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn">[docs]</a><span class="k">class</span> <span class="nc">DeleteTablesFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span> |
| |
| <div class="viewcode-block" id="DeleteTablesFn.start_bundle"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryWrapper</span><span class="p">(</span><span class="n">client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="DeleteTablesFn.process"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">table_reference</span><span class="p">):</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Deleting table </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">table_reference</span><span class="p">)</span> |
| <span class="n">table_reference</span> <span class="o">=</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">table_reference</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">bq_wrapper</span><span class="o">.</span><span class="n">_delete_table</span><span class="p">(</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">projectId</span><span class="p">,</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">datasetId</span><span class="p">,</span> |
| <span class="n">table_reference</span><span class="o">.</span><span class="n">tableId</span><span class="p">)</span></div></div> |
| |
| |
| <div class="viewcode-block" id="BigQueryBatchFileLoads"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads">[docs]</a><span class="k">class</span> <span class="nc">BigQueryBatchFileLoads</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Takes in a set of elements, and inserts them to BigQuery via batch loads.</span> |
| |
| <span class="sd"> """</span> |
| |
| <span class="n">DESTINATION_JOBID_PAIRS</span> <span class="o">=</span> <span class="s1">'destination_load_jobid_pairs'</span> |
| <span class="n">DESTINATION_FILE_PAIRS</span> <span class="o">=</span> <span class="s1">'destination_file_pairs'</span> |
| <span class="n">DESTINATION_COPY_JOBID_PAIRS</span> <span class="o">=</span> <span class="s1">'destination_copy_jobid_pairs'</span> |
| <span class="n">COUNT</span> <span class="o">=</span> <span class="mi">0</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">destination</span><span class="p">,</span> |
| <span class="n">project</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">schema</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">custom_gcs_temp_location</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">triggering_frequency</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">with_auto_sharding</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">temp_file_format</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">max_file_size</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">max_files_per_bundle</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">max_partition_size</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">max_files_per_partition</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">additional_bq_parameters</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">table_side_inputs</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">schema_side_inputs</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">test_client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">validate</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">is_streaming_pipeline</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">load_job_project_id</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">destination</span> <span class="o">=</span> <span class="n">destination</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="n">project</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span> <span class="o">=</span> <span class="n">create_disposition</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">=</span> <span class="n">write_disposition</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span> <span class="o">=</span> <span class="n">triggering_frequency</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">with_auto_sharding</span> <span class="o">=</span> <span class="n">with_auto_sharding</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span> <span class="o">=</span> <span class="n">max_file_size</span> <span class="ow">or</span> <span class="n">_DEFAULT_MAX_FILE_SIZE</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">max_files_per_bundle</span> <span class="ow">or</span> <span class="n">_DEFAULT_MAX_WRITERS_PER_BUNDLE</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_partition_size</span> <span class="o">=</span> <span class="n">max_partition_size</span> <span class="ow">or</span> <span class="n">_MAXIMUM_LOAD_SIZE</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_partition</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">max_files_per_partition</span> <span class="ow">or</span> <span class="n">_MAXIMUM_SOURCE_URIS</span><span class="p">)</span> |
| <span class="k">if</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">custom_gcs_temp_location</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span> <span class="ow">or</span> |
| <span class="n">custom_gcs_temp_location</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span> <span class="o">=</span> <span class="n">vp</span><span class="o">.</span><span class="n">StaticValueProvider</span><span class="p">(</span> |
| <span class="nb">str</span><span class="p">,</span> <span class="n">custom_gcs_temp_location</span> <span class="ow">or</span> <span class="s1">''</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">custom_gcs_temp_location</span><span class="p">,</span> <span class="n">vp</span><span class="o">.</span><span class="n">ValueProvider</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span> <span class="o">=</span> <span class="n">custom_gcs_temp_location</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'custom_gcs_temp_location must be str or ValueProvider'</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="o">=</span> <span class="n">test_client</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_temp_file_format</span> <span class="o">=</span> <span class="n">temp_file_format</span> <span class="ow">or</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">FileFormat</span><span class="o">.</span><span class="n">JSON</span> |
| |
| <span class="c1"># If we have multiple destinations, then we will have multiple load jobs,</span> |
| <span class="c1"># thus we will need temporary tables for atomicity.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dynamic_destinations</span> <span class="o">=</span> <span class="nb">bool</span><span class="p">(</span><span class="nb">callable</span><span class="p">(</span><span class="n">destination</span><span class="p">))</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span> <span class="o">=</span> <span class="n">additional_bq_parameters</span> <span class="ow">or</span> <span class="p">{}</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">table_side_inputs</span> <span class="o">=</span> <span class="n">table_side_inputs</span> <span class="ow">or</span> <span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span> <span class="o">=</span> <span class="n">schema_side_inputs</span> <span class="ow">or</span> <span class="p">()</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="o">=</span> <span class="n">is_streaming_pipeline</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span> <span class="o">=</span> <span class="n">load_job_project_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_validate</span> <span class="o">=</span> <span class="n">validate</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_validate</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">verify</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="BigQueryBatchFileLoads.verify"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads.verify">[docs]</a> <span class="k">def</span> <span class="nf">verify</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="o">.</span><span class="n">get</span><span class="p">(),</span> <span class="n">vp</span><span class="o">.</span><span class="n">StaticValueProvider</span><span class="p">)</span> |
| <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="o">.</span><span class="n">get</span><span class="p">()</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">'gs://'</span><span class="p">)):</span> |
| <span class="c1"># Only fail if the custom location is provided, and it is not a GCS</span> |
| <span class="c1"># location.</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'Invalid GCS location: </span><span class="si">%r</span><span class="s1">.</span><span class="se">\n</span><span class="s1">'</span> |
| <span class="s1">'Writing to BigQuery with FILE_LOADS method requires a '</span> |
| <span class="s1">'GCS location to be provided to write files to be '</span> |
| <span class="s1">'loaded into BigQuery. Please provide a GCS bucket, or '</span> |
| <span class="s1">'pass method="STREAMING_INSERTS" to WriteToBigQuery.'</span> <span class="o">%</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="o">.</span><span class="n">get</span><span class="p">())</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'triggering_frequency must be specified to use file'</span> |
| <span class="s1">'loads in streaming'</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'triggering_frequency can only be used with file'</span> |
| <span class="s1">'loads in streaming'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">with_auto_sharding</span><span class="p">:</span> |
| <span class="k">return</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'with_auto_sharding can only be used with file loads in streaming.'</span><span class="p">)</span></div> |
| |
| <span class="k">def</span> <span class="nf">_window_fn</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Set the correct WindowInto PTransform"""</span> |
| |
| <span class="c1"># The user-supplied triggering_frequency is often chosen to control how</span> |
| <span class="c1"># many BigQuery load jobs are triggered, to prevent going over BigQuery's</span> |
| <span class="c1"># daily quota for load jobs. If this is set to a large value, currently we</span> |
| <span class="c1"># have to buffer all the data until the trigger fires. Instead we ensure</span> |
| <span class="c1"># that the files are written if a threshold number of records are ready.</span> |
| <span class="c1"># We use only the user-supplied trigger on the actual BigQuery load.</span> |
| <span class="c1"># This allows us to offload the data to the filesystem.</span> |
| <span class="c1">#</span> |
| <span class="c1"># In the case of dynamic sharding, however, we use a default trigger since</span> |
| <span class="c1"># the transform performs sharding also batches elements to avoid generating</span> |
| <span class="c1"># too many tiny files. User trigger is applied right after writes to limit</span> |
| <span class="c1"># the number of load jobs.</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">with_auto_sharding</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">(),</span> |
| <span class="n">trigger</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">Repeatedly</span><span class="p">(</span> |
| <span class="n">trigger</span><span class="o">.</span><span class="n">AfterAny</span><span class="p">(</span> |
| <span class="n">trigger</span><span class="o">.</span><span class="n">AfterProcessingTime</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">),</span> |
| <span class="n">trigger</span><span class="o">.</span><span class="n">AfterCount</span><span class="p">(</span> |
| <span class="n">_FILE_TRIGGERING_RECORD_COUNT</span><span class="p">))),</span> |
| <span class="n">accumulation_mode</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">AccumulationMode</span>\ |
| <span class="o">.</span><span class="n">DISCARDING</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span> |
| |
| <span class="k">def</span> <span class="nf">_maybe_apply_user_trigger</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">destination_file_kv_pc</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_streaming_pipeline</span><span class="p">:</span> |
| <span class="c1"># Apply the user's trigger back before we start triggering load jobs</span> |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="n">destination_file_kv_pc</span> |
| <span class="o">|</span> <span class="s2">"ApplyUserTrigger"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span> |
| <span class="n">beam</span><span class="o">.</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">(),</span> |
| <span class="n">trigger</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">Repeatedly</span><span class="p">(</span> |
| <span class="n">trigger</span><span class="o">.</span><span class="n">AfterAll</span><span class="p">(</span> |
| <span class="n">trigger</span><span class="o">.</span><span class="n">AfterProcessingTime</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">triggering_frequency</span><span class="p">),</span> |
| <span class="n">trigger</span><span class="o">.</span><span class="n">AfterCount</span><span class="p">(</span><span class="mi">1</span><span class="p">))),</span> |
| <span class="n">accumulation_mode</span><span class="o">=</span><span class="n">trigger</span><span class="o">.</span><span class="n">AccumulationMode</span><span class="o">.</span><span class="n">DISCARDING</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">destination_file_kv_pc</span> |
| |
| <span class="k">def</span> <span class="nf">_write_files</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">destination_data_kv_pc</span><span class="p">,</span> <span class="n">file_prefix_pcv</span><span class="p">):</span> |
| <span class="n">outputs</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">destination_data_kv_pc</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">WriteRecordsToFile</span><span class="p">(</span> |
| <span class="n">schema</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> |
| <span class="n">max_files_per_bundle</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_bundle</span><span class="p">,</span> |
| <span class="n">max_file_size</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_file_size</span><span class="p">,</span> |
| <span class="n">file_format</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_temp_file_format</span><span class="p">),</span> |
| <span class="n">file_prefix_pcv</span><span class="p">,</span> |
| <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span><span class="p">)</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span> |
| <span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">UNWRITTEN_RECORD_TAG</span><span class="p">,</span> |
| <span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">))</span> |
| |
| <span class="c1"># A PCollection of (destination, file) tuples. It lists files with records,</span> |
| <span class="c1"># and the destination each file is meant to be imported into.</span> |
| <span class="n">destination_files_kv_pc</span> <span class="o">=</span> <span class="n">outputs</span><span class="p">[</span><span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">WRITTEN_FILE_TAG</span><span class="p">]</span> |
| |
| <span class="c1"># A PCollection of (destination, record) tuples. These are later sharded,</span> |
| <span class="c1"># grouped, and all records for each destination-shard is written to files.</span> |
| <span class="c1"># This PCollection is necessary because not all records can be written into</span> |
| <span class="c1"># files in ``WriteRecordsToFile``.</span> |
| <span class="n">unwritten_records_pc</span> <span class="o">=</span> <span class="n">outputs</span><span class="p">[</span><span class="n">WriteRecordsToFile</span><span class="o">.</span><span class="n">UNWRITTEN_RECORD_TAG</span><span class="p">]</span> |
| |
| <span class="n">more_destination_files_kv_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">unwritten_records_pc</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">_ShardDestinations</span><span class="p">())</span> |
| <span class="o">|</span> <span class="s2">"GroupShardedRows"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span> |
| <span class="o">|</span> <span class="s2">"DropShardNumber"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="n">x</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">],</span> <span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">]))</span> |
| <span class="o">|</span> <span class="s2">"WriteGroupedRecordsToFile"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">WriteGroupedRecordsToFile</span><span class="p">(</span> |
| <span class="n">schema</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="n">file_format</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_temp_file_format</span><span class="p">),</span> |
| <span class="n">file_prefix_pcv</span><span class="p">,</span> |
| <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span><span class="p">))</span> |
| |
| <span class="c1"># TODO(https://github.com/apache/beam/issues/20285): Remove the identity</span> |
| <span class="c1"># transform. We flatten both PCollection paths and use an identity function</span> |
| <span class="c1"># to work around a flatten optimization issue where the wrong coder is</span> |
| <span class="c1"># being used.</span> |
| <span class="n">all_destination_file_pairs_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="p">(</span><span class="n">destination_files_kv_pc</span><span class="p">,</span> <span class="n">more_destination_files_kv_pc</span><span class="p">)</span> |
| <span class="o">|</span> <span class="s2">"DestinationFilesUnion"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">()</span> |
| <span class="o">|</span> <span class="s2">"IdentityWorkaround"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">))</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_maybe_apply_user_trigger</span><span class="p">(</span><span class="n">all_destination_file_pairs_pc</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">_write_files_with_auto_sharding</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="n">destination_data_kv_pc</span><span class="p">,</span> <span class="n">file_prefix_pcv</span><span class="p">):</span> |
| <span class="n">clock</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="o">.</span><span class="n">test_clock</span> <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">test_client</span> <span class="k">else</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span> |
| |
| <span class="c1"># Auto-sharding is achieved via GroupIntoBatches.WithShardedKey</span> |
| <span class="c1"># transform which shards, groups and at the same time batches the table rows</span> |
| <span class="c1"># to be inserted to BigQuery.</span> |
| |
| <span class="c1"># Firstly, the keys of tagged_data (table references) are converted to a</span> |
| <span class="c1"># hashable format. This is needed to work with the keyed states used by.</span> |
| <span class="c1"># GroupIntoBatches. After grouping and batching is done, table references</span> |
| <span class="c1"># are restored.</span> |
| <span class="n">destination_files_kv_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">destination_data_kv_pc</span> |
| <span class="o">|</span> |
| <span class="s1">'ToHashableTableRef'</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="n">bigquery_tools</span><span class="o">.</span><span class="n">to_hashable_table_ref</span><span class="p">)</span> |
| <span class="o">|</span> <span class="s1">'WithAutoSharding'</span> <span class="o">>></span> <span class="n">GroupIntoBatches</span><span class="o">.</span><span class="n">WithShardedKey</span><span class="p">(</span> |
| <span class="n">batch_size</span><span class="o">=</span><span class="n">_FILE_TRIGGERING_RECORD_COUNT</span><span class="p">,</span> |
| <span class="n">max_buffering_duration_secs</span><span class="o">=</span><span class="n">_FILE_TRIGGERING_BATCHING_DURATION_SECS</span><span class="p">,</span> |
| <span class="n">clock</span><span class="o">=</span><span class="n">clock</span><span class="p">)</span> |
| <span class="o">|</span> <span class="s1">'FromHashableTableRefAndDropShard'</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span> |
| <span class="k">lambda</span> <span class="n">kvs</span><span class="p">:</span> |
| <span class="p">(</span><span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">kvs</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">key</span><span class="p">),</span> <span class="n">kvs</span><span class="p">[</span><span class="mi">1</span><span class="p">]))</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">WriteGroupedRecordsToFile</span><span class="p">(</span> |
| <span class="n">schema</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="n">file_format</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_temp_file_format</span><span class="p">),</span> |
| <span class="n">file_prefix_pcv</span><span class="p">,</span> |
| <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span><span class="p">))</span> |
| |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_maybe_apply_user_trigger</span><span class="p">(</span><span class="n">destination_files_kv_pc</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">_load_data</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">partitions_using_temp_tables</span><span class="p">,</span> |
| <span class="n">partitions_direct_to_destination</span><span class="p">,</span> |
| <span class="n">load_job_name_pcv</span><span class="p">,</span> |
| <span class="n">schema_mod_job_name_pcv</span><span class="p">,</span> |
| <span class="n">copy_job_name_pcv</span><span class="p">,</span> |
| <span class="n">p</span><span class="p">,</span> |
| <span class="n">step_name</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Load data to BigQuery</span> |
| |
| <span class="sd"> Data is loaded into BigQuery in the following two ways:</span> |
| <span class="sd"> 1. Single partition:</span> |
| <span class="sd"> When there is a single partition of files destined to a single</span> |
| <span class="sd"> destination, a single load job is triggered.</span> |
| <span class="sd"> 2. Multiple partitions and/or Dynamic Destinations:</span> |
| <span class="sd"> When there are multiple partitions of files destined for a single</span> |
| <span class="sd"> destination or when Dynamic Destinations are used, multiple load jobs</span> |
| <span class="sd"> need to be triggered for each partition/destination. Load Jobs are</span> |
| <span class="sd"> triggered to temporary tables, and those are later copied to the actual</span> |
| <span class="sd"> appropriate destination table. This ensures atomicity when only some</span> |
| <span class="sd"> of the load jobs would fail but not other. If any of them fails, then</span> |
| <span class="sd"> copy jobs are not triggered.</span> |
| <span class="sd"> """</span> |
| <span class="c1"># Load data using temp tables</span> |
| <span class="n">trigger_loads_outputs</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">partitions_using_temp_tables</span> |
| <span class="o">|</span> <span class="s2">"TriggerLoadJobsWithTempTables"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">TriggerLoadJobs</span><span class="p">(</span> |
| <span class="n">schema</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> |
| <span class="n">project</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span> |
| <span class="n">test_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">,</span> |
| <span class="n">temporary_tables</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">additional_bq_parameters</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">,</span> |
| <span class="n">source_format</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_temp_file_format</span><span class="p">,</span> |
| <span class="n">step_name</span><span class="o">=</span><span class="n">step_name</span><span class="p">,</span> |
| <span class="n">load_job_project_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span><span class="p">),</span> |
| <span class="n">load_job_name_pcv</span><span class="p">,</span> |
| <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span><span class="p">)</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span> |
| <span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">TEMP_TABLES</span><span class="p">,</span> |
| <span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">ONGOING_JOBS</span><span class="p">,</span> |
| <span class="n">main</span><span class="o">=</span><span class="s1">'main'</span><span class="p">))</span> |
| |
| <span class="n">finished_temp_tables_load_job_ids_pc</span> <span class="o">=</span> <span class="n">trigger_loads_outputs</span><span class="p">[</span><span class="s1">'main'</span><span class="p">]</span> |
| <span class="n">temp_tables_load_job_ids_pc</span> <span class="o">=</span> <span class="n">trigger_loads_outputs</span><span class="p">[</span> |
| <span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">ONGOING_JOBS</span><span class="p">]</span> |
| <span class="n">temp_tables_pc</span> <span class="o">=</span> <span class="n">trigger_loads_outputs</span><span class="p">[</span><span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">TEMP_TABLES</span><span class="p">]</span> |
| |
| <span class="n">schema_mod_job_ids_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">finished_temp_tables_load_job_ids_pc</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">UpdateDestinationSchema</span><span class="p">(</span> |
| <span class="n">project</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span> |
| <span class="n">test_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">,</span> |
| <span class="n">additional_bq_parameters</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">,</span> |
| <span class="n">step_name</span><span class="o">=</span><span class="n">step_name</span><span class="p">,</span> |
| <span class="n">load_job_project_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span><span class="p">),</span> |
| <span class="n">schema_mod_job_name_pcv</span><span class="p">))</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span> <span class="o">==</span> <span class="s1">'WRITE_TRUNCATE'</span><span class="p">:</span> |
| <span class="c1"># All loads going to the same table must be processed together so that</span> |
| <span class="c1"># the truncation happens only once. See BEAM-24535.</span> |
| <span class="n">finished_temp_tables_load_job_ids_list_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">finished_temp_tables_load_job_ids_pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">MapTuple</span><span class="p">(</span> |
| <span class="k">lambda</span> <span class="n">destination</span><span class="p">,</span> |
| <span class="n">job_reference</span><span class="p">:</span> <span class="p">(</span> |
| <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">parse_table_reference</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span><span class="o">.</span><span class="n">tableId</span><span class="p">,</span> |
| <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">job_reference</span><span class="p">)))</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">MapTuple</span><span class="p">(</span><span class="k">lambda</span> <span class="n">tableId</span><span class="p">,</span> <span class="n">batch</span><span class="p">:</span> <span class="nb">list</span><span class="p">(</span><span class="n">batch</span><span class="p">)))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># Loads can happen in parallel.</span> |
| <span class="n">finished_temp_tables_load_job_ids_list_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">finished_temp_tables_load_job_ids_pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">[</span><span class="n">x</span><span class="p">]))</span> |
| |
| <span class="n">copy_job_outputs</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">finished_temp_tables_load_job_ids_list_pc</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">TriggerCopyJobs</span><span class="p">(</span> |
| <span class="n">project</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span> |
| <span class="n">test_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">,</span> |
| <span class="n">step_name</span><span class="o">=</span><span class="n">step_name</span><span class="p">,</span> |
| <span class="n">load_job_project_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span><span class="p">),</span> |
| <span class="n">copy_job_name_pcv</span><span class="p">,</span> |
| <span class="n">pvalue</span><span class="o">.</span><span class="n">AsIter</span><span class="p">(</span><span class="n">schema_mod_job_ids_pc</span><span class="p">))</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span> |
| <span class="n">TriggerCopyJobs</span><span class="o">.</span><span class="n">TRIGGER_DELETE_TEMP_TABLES</span><span class="p">,</span> <span class="n">main</span><span class="o">=</span><span class="s1">'main'</span><span class="p">))</span> |
| |
| <span class="n">destination_copy_job_ids_pc</span> <span class="o">=</span> <span class="n">copy_job_outputs</span><span class="p">[</span><span class="s1">'main'</span><span class="p">]</span> |
| <span class="n">trigger_delete</span> <span class="o">=</span> <span class="n">copy_job_outputs</span><span class="p">[</span> |
| <span class="n">TriggerCopyJobs</span><span class="o">.</span><span class="n">TRIGGER_DELETE_TEMP_TABLES</span><span class="p">]</span> |
| |
| <span class="n">_</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">temp_tables_pc</span> |
| <span class="o">|</span> <span class="s2">"RemoveTempTables/AddUselessValue"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span> |
| <span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="n">unused_trigger</span><span class="p">:</span> <span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsList</span><span class="p">(</span><span class="n">trigger_delete</span><span class="p">))</span> |
| <span class="o">|</span> <span class="s2">"RemoveTempTables/DeduplicateTables"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span> |
| <span class="o">|</span> <span class="s2">"RemoveTempTables/GetTableNames"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Keys</span><span class="p">()</span> |
| <span class="o">|</span> <span class="s2">"RemoveTempTables/Delete"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">DeleteTablesFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">)))</span> |
| |
| <span class="c1"># Load data directly to destination table</span> |
| <span class="n">destination_load_job_ids_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">partitions_direct_to_destination</span> |
| <span class="o">|</span> <span class="s2">"TriggerLoadJobsWithoutTempTables"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">TriggerLoadJobs</span><span class="p">(</span> |
| <span class="n">schema</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> |
| <span class="n">write_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">write_disposition</span><span class="p">,</span> |
| <span class="n">create_disposition</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">create_disposition</span><span class="p">,</span> |
| <span class="n">test_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">test_client</span><span class="p">,</span> |
| <span class="n">temporary_tables</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">additional_bq_parameters</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">additional_bq_parameters</span><span class="p">,</span> |
| <span class="n">source_format</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_temp_file_format</span><span class="p">,</span> |
| <span class="n">step_name</span><span class="o">=</span><span class="n">step_name</span><span class="p">,</span> |
| <span class="n">load_job_project_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">load_job_project_id</span><span class="p">),</span> |
| <span class="n">load_job_name_pcv</span><span class="p">,</span> |
| <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">schema_side_inputs</span><span class="p">)</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span> |
| <span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">ONGOING_JOBS</span><span class="p">,</span> <span class="n">main</span><span class="o">=</span><span class="s1">'main'</span><span class="p">)</span> |
| <span class="p">)[</span><span class="n">TriggerLoadJobs</span><span class="o">.</span><span class="n">ONGOING_JOBS</span><span class="p">]</span> |
| |
| <span class="n">destination_load_job_ids_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="p">(</span><span class="n">temp_tables_load_job_ids_pc</span><span class="p">,</span> <span class="n">destination_load_job_ids_pc</span><span class="p">)</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">())</span> |
| |
| <span class="k">return</span> <span class="n">destination_load_job_ids_pc</span><span class="p">,</span> <span class="n">destination_copy_job_ids_pc</span> |
| |
| <div class="viewcode-block" id="BigQueryBatchFileLoads.expand"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.bigquery_file_loads.html#apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="n">p</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">project</span> <span class="ow">or</span> <span class="n">p</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">project</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">step_name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">label</span> |
| <span class="k">except</span> <span class="ne">AttributeError</span><span class="p">:</span> |
| <span class="n">step_name</span> <span class="o">=</span> <span class="s1">'BigQueryBatchFileLoads_</span><span class="si">%d</span><span class="s1">'</span> <span class="o">%</span> <span class="n">BigQueryBatchFileLoads</span><span class="o">.</span><span class="n">COUNT</span> |
| <span class="n">BigQueryBatchFileLoads</span><span class="o">.</span><span class="n">COUNT</span> <span class="o">+=</span> <span class="mi">1</span> |
| |
| <span class="n">temp_location</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">temp_location</span> |
| <span class="n">job_name</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">p</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">job_name</span> <span class="ow">or</span> <span class="s1">'AUTOMATIC_JOB_NAME'</span><span class="p">)</span> |
| |
| <span class="n">empty_pc</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="s2">"ImpulseEmptyPC"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([])</span> |
| <span class="n">singleton_pc</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="s2">"ImpulseSingleElementPC"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span> |
| |
| <span class="n">load_job_name_pcv</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span> |
| <span class="n">singleton_pc</span> |
| <span class="o">|</span> <span class="s2">"LoadJobNamePrefix"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span> |
| <span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="n">_generate_job_name</span><span class="p">(</span> |
| <span class="n">job_name</span><span class="p">,</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryJobTypes</span><span class="o">.</span><span class="n">LOAD</span><span class="p">,</span> <span class="s1">'LOAD_STEP'</span><span class="p">)))</span> |
| |
| <span class="n">schema_mod_job_name_pcv</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span> |
| <span class="n">singleton_pc</span> |
| <span class="o">|</span> <span class="s2">"SchemaModJobNamePrefix"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span> |
| <span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="n">_generate_job_name</span><span class="p">(</span> |
| <span class="n">job_name</span><span class="p">,</span> |
| <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryJobTypes</span><span class="o">.</span><span class="n">LOAD</span><span class="p">,</span> |
| <span class="s1">'SCHEMA_MOD_STEP'</span><span class="p">)))</span> |
| |
| <span class="n">copy_job_name_pcv</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span> |
| <span class="n">singleton_pc</span> |
| <span class="o">|</span> <span class="s2">"CopyJobNamePrefix"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span> |
| <span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="n">_generate_job_name</span><span class="p">(</span> |
| <span class="n">job_name</span><span class="p">,</span> <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">BigQueryJobTypes</span><span class="o">.</span><span class="n">COPY</span><span class="p">,</span> <span class="s1">'COPY_STEP'</span><span class="p">)))</span> |
| |
| <span class="n">file_prefix_pcv</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span> |
| <span class="n">singleton_pc</span> |
| <span class="o">|</span> <span class="s2">"GenerateFilePrefix"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span> |
| <span class="n">file_prefix_generator</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_validate</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_custom_gcs_temp_location</span><span class="p">,</span> <span class="n">temp_location</span><span class="p">)))</span> |
| |
| <span class="n">destination_data_kv_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">pcoll</span> |
| <span class="o">|</span> <span class="s2">"RewindowIntoGlobal"</span> <span class="o">>></span> <span class="bp">self</span><span class="o">.</span><span class="n">_window_fn</span><span class="p">()</span> |
| <span class="o">|</span> <span class="s2">"AppendDestination"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">bigquery_tools</span><span class="o">.</span><span class="n">AppendDestinationsFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">destination</span><span class="p">),</span> |
| <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">table_side_inputs</span><span class="p">))</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">with_auto_sharding</span><span class="p">:</span> |
| <span class="n">all_destination_file_pairs_pc</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_write_files</span><span class="p">(</span> |
| <span class="n">destination_data_kv_pc</span><span class="p">,</span> <span class="n">file_prefix_pcv</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">all_destination_file_pairs_pc</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_write_files_with_auto_sharding</span><span class="p">(</span> |
| <span class="n">destination_data_kv_pc</span><span class="p">,</span> <span class="n">file_prefix_pcv</span><span class="p">)</span> |
| |
| <span class="n">grouped_files_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">all_destination_file_pairs_pc</span> |
| <span class="o">|</span> <span class="s2">"GroupFilesByTableDestinations"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">())</span> |
| |
| <span class="n">partitions</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">grouped_files_pc</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">PartitionFiles</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_partition_size</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_files_per_partition</span><span class="p">))</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span> |
| <span class="n">PartitionFiles</span><span class="o">.</span><span class="n">MULTIPLE_PARTITIONS_TAG</span><span class="p">,</span> |
| <span class="n">PartitionFiles</span><span class="o">.</span><span class="n">SINGLE_PARTITION_TAG</span><span class="p">))</span> |
| |
| <span class="n">multiple_partitions_per_destination_pc</span> <span class="o">=</span> <span class="n">partitions</span><span class="p">[</span> |
| <span class="n">PartitionFiles</span><span class="o">.</span><span class="n">MULTIPLE_PARTITIONS_TAG</span><span class="p">]</span> |
| <span class="n">single_partition_per_destination_pc</span> <span class="o">=</span> <span class="n">partitions</span><span class="p">[</span> |
| <span class="n">PartitionFiles</span><span class="o">.</span><span class="n">SINGLE_PARTITION_TAG</span><span class="p">]</span> |
| |
| <span class="c1"># When using dynamic destinations, elements with both single as well as</span> |
| <span class="c1"># multiple partitions are loaded into BigQuery using temporary tables to</span> |
| <span class="c1"># ensure atomicity.</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">dynamic_destinations</span><span class="p">:</span> |
| <span class="n">all_partitions</span> <span class="o">=</span> <span class="p">((</span> |
| <span class="n">multiple_partitions_per_destination_pc</span><span class="p">,</span> |
| <span class="n">single_partition_per_destination_pc</span><span class="p">)</span> |
| <span class="o">|</span> <span class="s2">"FlattenPartitions"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">())</span> |
| <span class="n">destination_load_job_ids_pc</span><span class="p">,</span> <span class="n">destination_copy_job_ids_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_load_data</span><span class="p">(</span><span class="n">all_partitions</span><span class="p">,</span> |
| <span class="n">empty_pc</span><span class="p">,</span> |
| <span class="n">load_job_name_pcv</span><span class="p">,</span> |
| <span class="n">schema_mod_job_name_pcv</span><span class="p">,</span> |
| <span class="n">copy_job_name_pcv</span><span class="p">,</span> |
| <span class="n">p</span><span class="p">,</span> |
| <span class="n">step_name</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">destination_load_job_ids_pc</span><span class="p">,</span> <span class="n">destination_copy_job_ids_pc</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_load_data</span><span class="p">(</span><span class="n">multiple_partitions_per_destination_pc</span><span class="p">,</span> |
| <span class="n">single_partition_per_destination_pc</span><span class="p">,</span> |
| <span class="n">load_job_name_pcv</span><span class="p">,</span> |
| <span class="n">schema_mod_job_name_pcv</span><span class="p">,</span> |
| <span class="n">copy_job_name_pcv</span><span class="p">,</span> |
| <span class="n">p</span><span class="p">,</span> |
| <span class="n">step_name</span><span class="p">))</span> |
| |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">DESTINATION_JOBID_PAIRS</span><span class="p">:</span> <span class="n">destination_load_job_ids_pc</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">DESTINATION_FILE_PAIRS</span><span class="p">:</span> <span class="n">all_destination_file_pairs_pc</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">DESTINATION_COPY_JOBID_PAIRS</span><span class="p">:</span> <span class="n">destination_copy_job_ids_pc</span><span class="p">,</span> |
| <span class="p">}</span></div></div> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| © Copyright |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |