blob: f423c8fe8611dbf2c71718b82628494bd1b6d441 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.io.gcp.experimental.spannerio &mdash; Apache Beam 2.38.0 documentation</title>
<script type="text/javascript" src="../../../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../../../" src="../../../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../../../genindex.html" />
<link rel="search" title="Search" href="../../../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.38.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.utils.html">apache_beam.utils package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.io.gcp.experimental.spannerio</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.io.gcp.experimental.spannerio</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Google Cloud Spanner IO</span>
<span class="sd">Experimental; no backwards-compatibility guarantees.</span>
<span class="sd">This is an experimental module for reading and writing data from Google Cloud</span>
<span class="sd">Spanner. Visit: https://cloud.google.com/spanner for more details.</span>
<span class="sd">Reading Data from Cloud Spanner.</span>
<span class="sd">To read from Cloud Spanner apply ReadFromSpanner transformation. It will</span>
<span class="sd">return a PCollection, where each element represents an individual row returned</span>
<span class="sd">from the read operation. Both Query and Read APIs are supported.</span>
<span class="sd">ReadFromSpanner relies on the ReadOperation objects which is exposed by the</span>
<span class="sd">SpannerIO API. ReadOperation holds the immutable data which is responsible to</span>
<span class="sd">execute batch and naive reads on Cloud Spanner. This is done for more</span>
<span class="sd">convenient programming.</span>
<span class="sd">ReadFromSpanner reads from Cloud Spanner by providing either an &#39;sql&#39; param</span>
<span class="sd">in the constructor or &#39;table&#39; name with &#39;columns&#39; as list. For example:::</span>
<span class="sd"> records = (pipeline</span>
<span class="sd"> | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,</span>
<span class="sd"> sql=&#39;Select * from users&#39;))</span>
<span class="sd"> records = (pipeline</span>
<span class="sd"> | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,</span>
<span class="sd"> table=&#39;users&#39;, columns=[&#39;id&#39;, &#39;name&#39;, &#39;email&#39;]))</span>
<span class="sd">You can also perform multiple reads by providing a list of ReadOperations</span>
<span class="sd">to the ReadFromSpanner transform constructor. ReadOperation exposes two static</span>
<span class="sd">methods. Use &#39;query&#39; to perform sql based reads, &#39;table&#39; to perform read from</span>
<span class="sd">table name. For example:::</span>
<span class="sd"> read_operations = [</span>
<span class="sd"> ReadOperation.table(table=&#39;customers&#39;, columns=[&#39;name&#39;,</span>
<span class="sd"> &#39;email&#39;]),</span>
<span class="sd"> ReadOperation.table(table=&#39;vendors&#39;, columns=[&#39;name&#39;,</span>
<span class="sd"> &#39;email&#39;]),</span>
<span class="sd"> ]</span>
<span class="sd"> all_users = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,</span>
<span class="sd"> read_operations=read_operations)</span>
<span class="sd"> ...OR...</span>
<span class="sd"> read_operations = [</span>
<span class="sd"> ReadOperation.query(sql=&#39;Select name, email from</span>
<span class="sd"> customers&#39;),</span>
<span class="sd"> ReadOperation.query(</span>
<span class="sd"> sql=&#39;Select * from users where id &lt;= @user_id&#39;,</span>
<span class="sd"> params={&#39;user_id&#39;: 100},</span>
<span class="sd"> params_type={&#39;user_id&#39;: param_types.INT64}</span>
<span class="sd"> ),</span>
<span class="sd"> ]</span>
<span class="sd"> # `params_types` are instance of `google.cloud.spanner.param_types`</span>
<span class="sd"> all_users = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,</span>
<span class="sd"> read_operations=read_operations)</span>
<span class="sd">For more information, please review the docs on class ReadOperation.</span>
<span class="sd">User can also able to provide the ReadOperation in form of PCollection via</span>
<span class="sd">pipeline. For example:::</span>
<span class="sd"> users = (pipeline</span>
<span class="sd"> | beam.Create([ReadOperation...])</span>
<span class="sd"> | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME))</span>
<span class="sd">User may also create cloud spanner transaction from the transform called</span>
<span class="sd">`create_transaction` which is available in the SpannerIO API.</span>
<span class="sd">The transform is guaranteed to be executed on a consistent snapshot of data,</span>
<span class="sd">utilizing the power of read only transactions. Staleness of data can be</span>
<span class="sd">controlled by providing the `read_timestamp` or `exact_staleness` param values</span>
<span class="sd">in the constructor.</span>
<span class="sd">This transform requires root of the pipeline (PBegin) and returns PTransform</span>
<span class="sd">which is passed later to the `ReadFromSpanner` constructor. `ReadFromSpanner`</span>
<span class="sd">pass this transaction PTransform as a singleton side input to the</span>
<span class="sd">`_NaiveSpannerReadDoFn` containing &#39;session_id&#39; and &#39;transaction_id&#39;.</span>
<span class="sd">For example:::</span>
<span class="sd"> transaction = (pipeline | create_transaction(TEST_PROJECT_ID,</span>
<span class="sd"> TEST_INSTANCE_ID,</span>
<span class="sd"> DB_NAME))</span>
<span class="sd"> users = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,</span>
<span class="sd"> sql=&#39;Select * from users&#39;, transaction=transaction)</span>
<span class="sd"> tweets = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,</span>
<span class="sd"> sql=&#39;Select * from tweets&#39;, transaction=transaction)</span>
<span class="sd">For further details of this transform, please review the docs on the</span>
<span class="sd">:meth:`create_transaction` method available in the SpannerIO API.</span>
<span class="sd">ReadFromSpanner takes this transform in the constructor and pass this to the</span>
<span class="sd">read pipeline as the singleton side input.</span>
<span class="sd">Writing Data to Cloud Spanner.</span>
<span class="sd">The WriteToSpanner transform writes to Cloud Spanner by executing a</span>
<span class="sd">collection a input rows (WriteMutation). The mutations are grouped into</span>
<span class="sd">batches for efficiency.</span>
<span class="sd">WriteToSpanner transform relies on the WriteMutation objects which is exposed</span>
<span class="sd">by the SpannerIO API. WriteMutation have five static methods (insert, update,</span>
<span class="sd">insert_or_update, replace, delete). These methods returns the instance of the</span>
<span class="sd">_Mutator object which contains the mutation type and the Spanner Mutation</span>
<span class="sd">object. For more details, review the docs of the class SpannerIO.WriteMutation.</span>
<span class="sd">For example:::</span>
<span class="sd"> mutations = [</span>
<span class="sd"> WriteMutation.insert(table=&#39;user&#39;, columns=(&#39;name&#39;, &#39;email&#39;),</span>
<span class="sd"> values=[(&#39;sara&#39;, &#39;sara@dev.com&#39;)])</span>
<span class="sd"> ]</span>
<span class="sd"> _ = (p</span>
<span class="sd"> | beam.Create(mutations)</span>
<span class="sd"> | WriteToSpanner(</span>
<span class="sd"> project_id=SPANNER_PROJECT_ID,</span>
<span class="sd"> instance_id=SPANNER_INSTANCE_ID,</span>
<span class="sd"> database_id=SPANNER_DATABASE_NAME)</span>
<span class="sd"> )</span>
<span class="sd">You can also create WriteMutation via calling its constructor. For example:::</span>
<span class="sd"> mutations = [</span>
<span class="sd"> WriteMutation(insert=&#39;users&#39;, columns=(&#39;name&#39;, &#39;email&#39;),</span>
<span class="sd"> values=[(&#39;sara&quot;, &#39;sara@example.com&#39;)])</span>
<span class="sd"> ]</span>
<span class="sd">For more information, review the docs available on WriteMutation class.</span>
<span class="sd">WriteToSpanner transform also takes three batching parameters (max_number_rows,</span>
<span class="sd">max_number_cells and max_batch_size_bytes). By default, max_number_rows is set</span>
<span class="sd">to 50 rows, max_number_cells is set to 500 cells and max_batch_size_bytes is</span>
<span class="sd">set to 1MB (1048576 bytes). These parameter used to reduce the number of</span>
<span class="sd">transactions sent to spanner by grouping the mutation into batches. Setting</span>
<span class="sd">these param values either to smaller value or zero to disable batching.</span>
<span class="sd">Unlike the Java connector, this connector does not create batches of</span>
<span class="sd">transactions sorted by table and primary key.</span>
<span class="sd">WriteToSpanner transforms starts with the grouping into batches. The first step</span>
<span class="sd">in this process is to make the mutation groups of the WriteMutation</span>
<span class="sd">objects and then filtering them into batchable and unbatchable mutation</span>
<span class="sd">groups. There are three batching parameters (max_number_cells, max_number_rows</span>
<span class="sd">&amp; max_batch_size_bytes). We calculated th mutation byte size from the method</span>
<span class="sd">available in the `google.cloud.spanner_v1.proto.mutation_pb2.Mutation.ByteSize`.</span>
<span class="sd">if the mutation rows, cells or byte size are larger than value of the any</span>
<span class="sd">batching parameters param, it will be tagged as &quot;unbatchable&quot; mutation. After</span>
<span class="sd">this all the batchable mutation are merged into a single mutation group whos</span>
<span class="sd">size is not larger than the &quot;max_batch_size_bytes&quot;, after this process, all the</span>
<span class="sd">mutation groups together to process. If the Mutation references a table or</span>
<span class="sd">column does not exits, it will cause a exception and fails the entire pipeline.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="kn">import</span> <span class="nn">typing</span>
<span class="kn">from</span> <span class="nn">collections</span> <span class="kn">import</span> <span class="n">deque</span>
<span class="kn">from</span> <span class="nn">collections</span> <span class="kn">import</span> <span class="n">namedtuple</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">Create</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">DoFn</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">Flatten</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">ParDo</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">Reshuffle</span>
<span class="kn">from</span> <span class="nn">apache_beam.internal.metrics.metric</span> <span class="kn">import</span> <span class="n">ServiceCallMetric</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp</span> <span class="kn">import</span> <span class="n">resource_identifiers</span>
<span class="kn">from</span> <span class="nn">apache_beam.metrics</span> <span class="kn">import</span> <span class="n">Metrics</span>
<span class="kn">from</span> <span class="nn">apache_beam.metrics</span> <span class="kn">import</span> <span class="n">monitoring_infos</span>
<span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="kn">import</span> <span class="n">AsSingleton</span>
<span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="kn">import</span> <span class="n">PBegin</span>
<span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="kn">import</span> <span class="n">TaggedOutput</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">PTransform</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">ptransform_fn</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">window</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="kn">import</span> <span class="n">DisplayDataItem</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="kn">import</span> <span class="n">with_input_types</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="kn">import</span> <span class="n">with_output_types</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils.annotations</span> <span class="kn">import</span> <span class="n">experimental</span>
<span class="c1"># Protect against environments where spanner library is not available.</span>
<span class="c1"># pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports</span>
<span class="c1"># pylint: disable=unused-import</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">google.cloud.spanner</span> <span class="kn">import</span> <span class="n">Client</span>
<span class="kn">from</span> <span class="nn">google.cloud.spanner</span> <span class="kn">import</span> <span class="n">KeySet</span>
<span class="kn">from</span> <span class="nn">google.cloud.spanner_v1</span> <span class="kn">import</span> <span class="n">batch</span>
<span class="kn">from</span> <span class="nn">google.cloud.spanner_v1.database</span> <span class="kn">import</span> <span class="n">BatchSnapshot</span>
<span class="kn">from</span> <span class="nn">google.api_core.exceptions</span> <span class="kn">import</span> <span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span>
<span class="kn">from</span> <span class="nn">apitools.base.py.exceptions</span> <span class="kn">import</span> <span class="n">HttpError</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="n">Client</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">KeySet</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">BatchSnapshot</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">google.cloud.spanner_v1</span> <span class="kn">import</span> <span class="n">Mutation</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># Remove this and the try clause when we upgrade to google-cloud-spanner</span>
<span class="c1"># 3.x.x.</span>
<span class="kn">from</span> <span class="nn">google.cloud.spanner_v1.proto.mutation_pb2</span> <span class="kn">import</span> <span class="n">Mutation</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="c1"># Ignoring for environments where the Spanner library is not available.</span>
<span class="k">pass</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span>
<span class="s1">&#39;create_transaction&#39;</span><span class="p">,</span>
<span class="s1">&#39;ReadFromSpanner&#39;</span><span class="p">,</span>
<span class="s1">&#39;ReadOperation&#39;</span><span class="p">,</span>
<span class="s1">&#39;WriteToSpanner&#39;</span><span class="p">,</span>
<span class="s1">&#39;WriteMutation&#39;</span><span class="p">,</span>
<span class="s1">&#39;MutationGroup&#39;</span>
<span class="p">]</span>
<span class="k">class</span> <span class="nc">_SPANNER_TRANSACTION</span><span class="p">(</span><span class="n">namedtuple</span><span class="p">(</span><span class="s2">&quot;SPANNER_TRANSACTION&quot;</span><span class="p">,</span> <span class="p">[</span><span class="s2">&quot;transaction&quot;</span><span class="p">])):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Holds the spanner transaction details.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="vm">__slots__</span> <span class="o">=</span> <span class="p">()</span>
<div class="viewcode-block" id="ReadOperation"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.ReadOperation">[docs]</a><span class="k">class</span> <span class="nc">ReadOperation</span><span class="p">(</span><span class="n">namedtuple</span><span class="p">(</span>
<span class="s2">&quot;ReadOperation&quot;</span><span class="p">,</span> <span class="p">[</span><span class="s2">&quot;is_sql&quot;</span><span class="p">,</span> <span class="s2">&quot;is_table&quot;</span><span class="p">,</span> <span class="s2">&quot;read_operation&quot;</span><span class="p">,</span> <span class="s2">&quot;kwargs&quot;</span><span class="p">])):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Encapsulates a spanner read operation.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="vm">__slots__</span> <span class="o">=</span> <span class="p">()</span>
<div class="viewcode-block" id="ReadOperation.query"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.ReadOperation.query">[docs]</a> <span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">query</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">sql</span><span class="p">,</span> <span class="n">params</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">param_types</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A convenient method to construct ReadOperation from sql query.</span>
<span class="sd"> Args:</span>
<span class="sd"> sql: SQL query statement</span>
<span class="sd"> params: (optional) values for parameter replacement. Keys must match the</span>
<span class="sd"> names used in sql</span>
<span class="sd"> param_types: (optional) maps explicit types for one or more param values;</span>
<span class="sd"> required if parameters are passed.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">params</span><span class="p">:</span>
<span class="k">assert</span> <span class="n">param_types</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="k">return</span> <span class="bp">cls</span><span class="p">(</span>
<span class="n">is_sql</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">is_table</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">read_operation</span><span class="o">=</span><span class="s2">&quot;process_query_batch&quot;</span><span class="p">,</span>
<span class="n">kwargs</span><span class="o">=</span><span class="p">{</span>
<span class="s1">&#39;sql&#39;</span><span class="p">:</span> <span class="n">sql</span><span class="p">,</span> <span class="s1">&#39;params&#39;</span><span class="p">:</span> <span class="n">params</span><span class="p">,</span> <span class="s1">&#39;param_types&#39;</span><span class="p">:</span> <span class="n">param_types</span>
<span class="p">})</span></div>
<div class="viewcode-block" id="ReadOperation.table"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.ReadOperation.table">[docs]</a> <span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">table</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">index</span><span class="o">=</span><span class="s2">&quot;&quot;</span><span class="p">,</span> <span class="n">keyset</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A convenient method to construct ReadOperation from table.</span>
<span class="sd"> Args:</span>
<span class="sd"> table: name of the table from which to fetch data.</span>
<span class="sd"> columns: names of columns to be retrieved.</span>
<span class="sd"> index: (optional) name of index to use, rather than the table&#39;s primary</span>
<span class="sd"> key.</span>
<span class="sd"> keyset: (optional) `KeySet` keys / ranges identifying rows to be</span>
<span class="sd"> retrieved.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">keyset</span> <span class="o">=</span> <span class="n">keyset</span> <span class="ow">or</span> <span class="n">KeySet</span><span class="p">(</span><span class="n">all_</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">keyset</span><span class="p">,</span> <span class="n">KeySet</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;keyset must be an instance of class &quot;</span>
<span class="s2">&quot;google.cloud.spanner.KeySet&quot;</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">cls</span><span class="p">(</span>
<span class="n">is_sql</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">is_table</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">read_operation</span><span class="o">=</span><span class="s2">&quot;process_read_batch&quot;</span><span class="p">,</span>
<span class="n">kwargs</span><span class="o">=</span><span class="p">{</span>
<span class="s1">&#39;table&#39;</span><span class="p">:</span> <span class="n">table</span><span class="p">,</span>
<span class="s1">&#39;columns&#39;</span><span class="p">:</span> <span class="n">columns</span><span class="p">,</span>
<span class="s1">&#39;index&#39;</span><span class="p">:</span> <span class="n">index</span><span class="p">,</span>
<span class="s1">&#39;keyset&#39;</span><span class="p">:</span> <span class="n">keyset</span>
<span class="p">})</span></div></div>
<span class="k">class</span> <span class="nc">_BeamSpannerConfiguration</span><span class="p">(</span><span class="n">namedtuple</span><span class="p">(</span><span class="s2">&quot;_BeamSpannerConfiguration&quot;</span><span class="p">,</span>
<span class="p">[</span><span class="s2">&quot;project&quot;</span><span class="p">,</span>
<span class="s2">&quot;instance&quot;</span><span class="p">,</span>
<span class="s2">&quot;database&quot;</span><span class="p">,</span>
<span class="s2">&quot;table&quot;</span><span class="p">,</span>
<span class="s2">&quot;query_name&quot;</span><span class="p">,</span>
<span class="s2">&quot;credentials&quot;</span><span class="p">,</span>
<span class="s2">&quot;pool&quot;</span><span class="p">,</span>
<span class="s2">&quot;snapshot_read_timestamp&quot;</span><span class="p">,</span>
<span class="s2">&quot;snapshot_exact_staleness&quot;</span><span class="p">])):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A namedtuple holds the immutable data of the connection string to the cloud</span>
<span class="sd"> spanner.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">snapshot_options</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">snapshot_options</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">snapshot_exact_staleness</span><span class="p">:</span>
<span class="n">snapshot_options</span><span class="p">[</span><span class="s1">&#39;exact_staleness&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">snapshot_exact_staleness</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">snapshot_read_timestamp</span><span class="p">:</span>
<span class="n">snapshot_options</span><span class="p">[</span><span class="s1">&#39;read_timestamp&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">snapshot_read_timestamp</span>
<span class="k">return</span> <span class="n">snapshot_options</span>
<span class="nd">@with_input_types</span><span class="p">(</span><span class="n">ReadOperation</span><span class="p">,</span> <span class="n">_SPANNER_TRANSACTION</span><span class="p">)</span>
<span class="nd">@with_output_types</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">List</span><span class="p">[</span><span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">])</span>
<span class="k">class</span> <span class="nc">_NaiveSpannerReadDoFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">spanner_configuration</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A naive version of Spanner read which uses the transaction API of the</span>
<span class="sd"> cloud spanner.</span>
<span class="sd"> https://googleapis.dev/python/spanner/latest/transaction-api.html</span>
<span class="sd"> In Naive reads, this transform performs single reads, where as the</span>
<span class="sd"> Batch reads use the spanner partitioning query to create batches.</span>
<span class="sd"> Args:</span>
<span class="sd"> spanner_configuration: (_BeamSpannerConfiguration) Connection details to</span>
<span class="sd"> connect with cloud spanner.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span> <span class="o">=</span> <span class="n">spanner_configuration</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_session</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span> <span class="o">=</span> <span class="p">{</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SERVICE_LABEL</span><span class="p">:</span> <span class="s1">&#39;Spanner&#39;</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">METHOD_LABEL</span><span class="p">:</span> <span class="s1">&#39;Read&#39;</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_PROJECT_ID</span><span class="p">:</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">),</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_DATABASE_ID</span><span class="p">:</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">),</span>
<span class="p">}</span>
<span class="k">def</span> <span class="nf">_table_metric</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">table_id</span><span class="p">,</span> <span class="n">status</span><span class="p">):</span>
<span class="n">database_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span>
<span class="n">project_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span>
<span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">SpannerTable</span><span class="p">(</span>
<span class="n">project_id</span><span class="p">,</span> <span class="n">database_id</span><span class="p">,</span> <span class="n">table_id</span><span class="p">)</span>
<span class="n">labels</span> <span class="o">=</span> <span class="p">{</span>
<span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_TABLE_ID</span><span class="p">:</span> <span class="n">table_id</span>
<span class="p">}</span>
<span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span>
<span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span>
<span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span>
<span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">status</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">_query_metric</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">query_name</span><span class="p">,</span> <span class="n">status</span><span class="p">):</span>
<span class="n">project_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span>
<span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">SpannerSqlQuery</span><span class="p">(</span><span class="n">project_id</span><span class="p">,</span> <span class="n">query_name</span><span class="p">)</span>
<span class="n">labels</span> <span class="o">=</span> <span class="p">{</span>
<span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_QUERY_NAME</span><span class="p">:</span> <span class="n">query_name</span>
<span class="p">}</span>
<span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span>
<span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span>
<span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span>
<span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">status</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">_get_session</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_session</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">session</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_session</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_database</span><span class="o">.</span><span class="n">session</span><span class="p">()</span>
<span class="n">session</span><span class="o">.</span><span class="n">create</span><span class="p">()</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_session</span>
<span class="k">def</span> <span class="nf">_close_session</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_session</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_session</span><span class="o">.</span><span class="n">delete</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">setup</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># setting up client to connect with cloud spanner</span>
<span class="n">spanner_client</span> <span class="o">=</span> <span class="n">Client</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">)</span>
<span class="n">instance</span> <span class="o">=</span> <span class="n">spanner_client</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">instance</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_database</span> <span class="o">=</span> <span class="n">instance</span><span class="o">.</span><span class="n">database</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">pool</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">spanner_transaction</span><span class="p">):</span>
<span class="c1"># `spanner_transaction` should be the instance of the _SPANNER_TRANSACTION</span>
<span class="c1"># object.</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">spanner_transaction</span><span class="p">,</span> <span class="n">_SPANNER_TRANSACTION</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;Invalid transaction object: </span><span class="si">%s</span><span class="s2">. It should be instance &quot;</span>
<span class="s2">&quot;of SPANNER_TRANSACTION object created by &quot;</span>
<span class="s2">&quot;spannerio.create_transaction transform.&quot;</span> <span class="o">%</span> <span class="nb">type</span><span class="p">(</span><span class="n">spanner_transaction</span><span class="p">))</span>
<span class="n">transaction_info</span> <span class="o">=</span> <span class="n">spanner_transaction</span><span class="o">.</span><span class="n">transaction</span>
<span class="c1"># We used batch snapshot to reuse the same transaction passed through the</span>
<span class="c1"># side input</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="n">BatchSnapshot</span><span class="o">.</span><span class="n">from_dict</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_database</span><span class="p">,</span> <span class="n">transaction_info</span><span class="p">)</span>
<span class="c1"># getting the transaction from the snapshot&#39;s session to run read operation.</span>
<span class="c1"># with self._snapshot.session().transaction() as transaction:</span>
<span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_session</span><span class="p">()</span><span class="o">.</span><span class="n">transaction</span><span class="p">()</span> <span class="k">as</span> <span class="n">transaction</span><span class="p">:</span>
<span class="n">table_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">table</span>
<span class="n">query_name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">query_name</span> <span class="ow">or</span> <span class="s1">&#39;&#39;</span>
<span class="k">if</span> <span class="n">element</span><span class="o">.</span><span class="n">is_sql</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span>
<span class="n">transaction_read</span> <span class="o">=</span> <span class="n">transaction</span><span class="o">.</span><span class="n">execute_sql</span>
<span class="n">metric_action</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_query_metric</span>
<span class="n">metric_id</span> <span class="o">=</span> <span class="n">query_name</span>
<span class="k">elif</span> <span class="n">element</span><span class="o">.</span><span class="n">is_table</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span>
<span class="n">transaction_read</span> <span class="o">=</span> <span class="n">transaction</span><span class="o">.</span><span class="n">read</span>
<span class="n">metric_action</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_table_metric</span>
<span class="n">metric_id</span> <span class="o">=</span> <span class="n">table_id</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;ReadOperation is improperly configure: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="nb">str</span><span class="p">(</span><span class="n">element</span><span class="p">))</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">for</span> <span class="n">row</span> <span class="ow">in</span> <span class="n">transaction_read</span><span class="p">(</span><span class="o">**</span><span class="n">element</span><span class="o">.</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">yield</span> <span class="n">row</span>
<span class="n">metric_action</span><span class="p">(</span><span class="n">metric_id</span><span class="p">,</span> <span class="s1">&#39;ok&#39;</span><span class="p">)</span>
<span class="k">except</span> <span class="p">(</span><span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span><span class="p">)</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">metric_action</span><span class="p">(</span><span class="n">metric_id</span><span class="p">,</span> <span class="n">e</span><span class="o">.</span><span class="n">code</span><span class="o">.</span><span class="n">value</span><span class="p">)</span>
<span class="k">raise</span>
<span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">metric_action</span><span class="p">(</span><span class="n">metric_id</span><span class="p">,</span> <span class="n">e</span><span class="p">)</span>
<span class="k">raise</span>
<span class="nd">@with_input_types</span><span class="p">(</span><span class="n">ReadOperation</span><span class="p">)</span>
<span class="nd">@with_output_types</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">Dict</span><span class="p">[</span><span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">])</span>
<span class="k">class</span> <span class="nc">_CreateReadPartitions</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A DoFn to create partitions. Uses the Partitioning API (PartitionRead /</span>
<span class="sd"> PartitionQuery) request to start a partitioned query operation. Returns a</span>
<span class="sd"> list of batch information needed to perform the actual queries.</span>
<span class="sd"> If the element is the instance of :class:`ReadOperation` is to perform sql</span>
<span class="sd"> query, `PartitionQuery` API is used the create partitions and returns mappings</span>
<span class="sd"> of information used perform actual partitioned reads via</span>
<span class="sd"> :meth:`process_query_batch`.</span>
<span class="sd"> If the element is the instance of :class:`ReadOperation` is to perform read</span>
<span class="sd"> from table, `PartitionRead` API is used the create partitions and returns</span>
<span class="sd"> mappings of information used perform actual partitioned reads via</span>
<span class="sd"> :meth:`process_read_batch`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">spanner_configuration</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span> <span class="o">=</span> <span class="n">spanner_configuration</span>
<span class="k">def</span> <span class="nf">setup</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">spanner_client</span> <span class="o">=</span> <span class="n">Client</span><span class="p">(</span>
<span class="n">project</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">,</span>
<span class="n">credentials</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">credentials</span><span class="p">)</span>
<span class="n">instance</span> <span class="o">=</span> <span class="n">spanner_client</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">instance</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_database</span> <span class="o">=</span> <span class="n">instance</span><span class="o">.</span><span class="n">database</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">pool</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_database</span><span class="o">.</span><span class="n">batch_snapshot</span><span class="p">(</span>
<span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">snapshot_options</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_snapshot_dict</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">to_dict</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">if</span> <span class="n">element</span><span class="o">.</span><span class="n">is_sql</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span>
<span class="n">partitioning_action</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">generate_query_batches</span>
<span class="k">elif</span> <span class="n">element</span><span class="o">.</span><span class="n">is_table</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span>
<span class="n">partitioning_action</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">generate_read_batches</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;ReadOperation is improperly configure: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="nb">str</span><span class="p">(</span><span class="n">element</span><span class="p">))</span>
<span class="k">for</span> <span class="n">p</span> <span class="ow">in</span> <span class="n">partitioning_action</span><span class="p">(</span><span class="o">**</span><span class="n">element</span><span class="o">.</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">yield</span> <span class="p">{</span>
<span class="s2">&quot;is_sql&quot;</span><span class="p">:</span> <span class="n">element</span><span class="o">.</span><span class="n">is_sql</span><span class="p">,</span>
<span class="s2">&quot;is_table&quot;</span><span class="p">:</span> <span class="n">element</span><span class="o">.</span><span class="n">is_table</span><span class="p">,</span>
<span class="s2">&quot;read_operation&quot;</span><span class="p">:</span> <span class="n">element</span><span class="o">.</span><span class="n">read_operation</span><span class="p">,</span>
<span class="s2">&quot;partitions&quot;</span><span class="p">:</span> <span class="n">p</span><span class="p">,</span>
<span class="s2">&quot;transaction_info&quot;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot_dict</span>
<span class="p">}</span>
<span class="nd">@with_input_types</span><span class="p">(</span><span class="nb">int</span><span class="p">)</span>
<span class="nd">@with_output_types</span><span class="p">(</span><span class="n">_SPANNER_TRANSACTION</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_CreateTransactionFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A DoFn to create the transaction of cloud spanner.</span>
<span class="sd"> It connects to the database and and returns the transaction_id and session_id</span>
<span class="sd"> by using the batch_snapshot.to_dict() method available in the google cloud</span>
<span class="sd"> spanner sdk.</span>
<span class="sd"> https://googleapis.dev/python/spanner/latest/database-api.html?highlight=</span>
<span class="sd"> batch_snapshot#google.cloud.spanner_v1.database.BatchSnapshot.to_dict</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">project_id</span><span class="p">,</span>
<span class="n">instance_id</span><span class="p">,</span>
<span class="n">database_id</span><span class="p">,</span>
<span class="n">credentials</span><span class="p">,</span>
<span class="n">pool</span><span class="p">,</span>
<span class="n">read_timestamp</span><span class="p">,</span>
<span class="n">exact_staleness</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_project_id</span> <span class="o">=</span> <span class="n">project_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_instance_id</span> <span class="o">=</span> <span class="n">instance_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_database_id</span> <span class="o">=</span> <span class="n">database_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_credentials</span> <span class="o">=</span> <span class="n">credentials</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pool</span> <span class="o">=</span> <span class="n">pool</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_snapshot_options</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">if</span> <span class="n">read_timestamp</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_snapshot_options</span><span class="p">[</span><span class="s1">&#39;read_timestamp&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">read_timestamp</span>
<span class="k">if</span> <span class="n">exact_staleness</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_snapshot_options</span><span class="p">[</span><span class="s1">&#39;exact_staleness&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">exact_staleness</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="nf">setup</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spanner_client</span> <span class="o">=</span> <span class="n">Client</span><span class="p">(</span>
<span class="n">project</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_project_id</span><span class="p">,</span> <span class="n">credentials</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_credentials</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_instance</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_client</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_instance_id</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_database</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_instance</span><span class="o">.</span><span class="n">database</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_database_id</span><span class="p">,</span> <span class="n">pool</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_pool</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_database</span><span class="o">.</span><span class="n">batch_snapshot</span><span class="p">(</span><span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">_snapshot_options</span><span class="p">)</span>
<span class="k">return</span> <span class="p">[</span><span class="n">_SPANNER_TRANSACTION</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">to_dict</span><span class="p">())]</span>
<div class="viewcode-block" id="create_transaction"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.create_transaction">[docs]</a><span class="nd">@ptransform_fn</span>
<span class="k">def</span> <span class="nf">create_transaction</span><span class="p">(</span>
<span class="n">pbegin</span><span class="p">,</span>
<span class="n">project_id</span><span class="p">,</span>
<span class="n">instance_id</span><span class="p">,</span>
<span class="n">database_id</span><span class="p">,</span>
<span class="n">credentials</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">read_timestamp</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">exact_staleness</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A PTransform method to create a batch transaction.</span>
<span class="sd"> Args:</span>
<span class="sd"> pbegin: Root of the pipeline</span>
<span class="sd"> project_id: Cloud spanner project id. Be sure to use the Project ID,</span>
<span class="sd"> not the Project Number.</span>
<span class="sd"> instance_id: Cloud spanner instance id.</span>
<span class="sd"> database_id: Cloud spanner database id.</span>
<span class="sd"> credentials: (optional) The authorization credentials to attach to requests.</span>
<span class="sd"> These credentials identify this application to the service.</span>
<span class="sd"> If none are specified, the client will attempt to ascertain</span>
<span class="sd"> the credentials from the environment.</span>
<span class="sd"> pool: (optional) session pool to be used by database. If not passed,</span>
<span class="sd"> Spanner Cloud SDK uses the BurstyPool by default.</span>
<span class="sd"> `google.cloud.spanner.BurstyPool`. Ref:</span>
<span class="sd"> https://googleapis.dev/python/spanner/latest/database-api.html?#google.</span>
<span class="sd"> cloud.spanner_v1.database.Database</span>
<span class="sd"> read_timestamp: (optional) An instance of the `datetime.datetime` object to</span>
<span class="sd"> execute all reads at the given timestamp.</span>
<span class="sd"> exact_staleness: (optional) An instance of the `datetime.timedelta`</span>
<span class="sd"> object. These timestamp bounds execute reads at a user-specified</span>
<span class="sd"> timestamp.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pbegin</span><span class="p">,</span> <span class="n">PBegin</span><span class="p">)</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">pbegin</span> <span class="o">|</span> <span class="n">Create</span><span class="p">([</span><span class="mi">1</span><span class="p">])</span> <span class="o">|</span> <span class="n">ParDo</span><span class="p">(</span>
<span class="n">_CreateTransactionFn</span><span class="p">(</span>
<span class="n">project_id</span><span class="p">,</span>
<span class="n">instance_id</span><span class="p">,</span>
<span class="n">database_id</span><span class="p">,</span>
<span class="n">credentials</span><span class="p">,</span>
<span class="n">pool</span><span class="p">,</span>
<span class="n">read_timestamp</span><span class="p">,</span>
<span class="n">exact_staleness</span><span class="p">)))</span></div>
<span class="nd">@with_input_types</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">Dict</span><span class="p">[</span><span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">])</span>
<span class="nd">@with_output_types</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">List</span><span class="p">[</span><span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">])</span>
<span class="k">class</span> <span class="nc">_ReadFromPartitionFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A DoFn to perform reads from the partition.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">spanner_configuration</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span> <span class="o">=</span> <span class="n">spanner_configuration</span>
<span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span> <span class="o">=</span> <span class="p">{</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SERVICE_LABEL</span><span class="p">:</span> <span class="s1">&#39;Spanner&#39;</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">METHOD_LABEL</span><span class="p">:</span> <span class="s1">&#39;Read&#39;</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_PROJECT_ID</span><span class="p">:</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">),</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_DATABASE_ID</span><span class="p">:</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">),</span>
<span class="p">}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">service_metric</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="nf">_table_metric</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">table_id</span><span class="p">):</span>
<span class="n">database_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span>
<span class="n">project_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span>
<span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">SpannerTable</span><span class="p">(</span>
<span class="n">project_id</span><span class="p">,</span> <span class="n">database_id</span><span class="p">,</span> <span class="n">table_id</span><span class="p">)</span>
<span class="n">labels</span> <span class="o">=</span> <span class="p">{</span>
<span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_TABLE_ID</span><span class="p">:</span> <span class="n">table_id</span>
<span class="p">}</span>
<span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span>
<span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span>
<span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span>
<span class="k">return</span> <span class="n">service_call_metric</span>
<span class="k">def</span> <span class="nf">_query_metric</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">query_name</span><span class="p">):</span>
<span class="n">project_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span>
<span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">SpannerSqlQuery</span><span class="p">(</span><span class="n">project_id</span><span class="p">,</span> <span class="n">query_name</span><span class="p">)</span>
<span class="n">labels</span> <span class="o">=</span> <span class="p">{</span>
<span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_QUERY_NAME</span><span class="p">:</span> <span class="n">query_name</span>
<span class="p">}</span>
<span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span>
<span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span>
<span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span>
<span class="k">return</span> <span class="n">service_call_metric</span>
<span class="k">def</span> <span class="nf">setup</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">spanner_client</span> <span class="o">=</span> <span class="n">Client</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">)</span>
<span class="n">instance</span> <span class="o">=</span> <span class="n">spanner_client</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">instance</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_database</span> <span class="o">=</span> <span class="n">instance</span><span class="o">.</span><span class="n">database</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">pool</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_database</span><span class="o">.</span><span class="n">batch_snapshot</span><span class="p">(</span>
<span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">snapshot_options</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="n">BatchSnapshot</span><span class="o">.</span><span class="n">from_dict</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_database</span><span class="p">,</span> <span class="n">element</span><span class="p">[</span><span class="s1">&#39;transaction_info&#39;</span><span class="p">])</span>
<span class="n">table_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">table</span>
<span class="n">query_name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">query_name</span> <span class="ow">or</span> <span class="s1">&#39;&#39;</span>
<span class="k">if</span> <span class="n">element</span><span class="p">[</span><span class="s1">&#39;is_sql&#39;</span><span class="p">]</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span>
<span class="n">read_action</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">process_query_batch</span>
<span class="bp">self</span><span class="o">.</span><span class="n">service_metric</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_query_metric</span><span class="p">(</span><span class="n">query_name</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">element</span><span class="p">[</span><span class="s1">&#39;is_table&#39;</span><span class="p">]</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span>
<span class="n">read_action</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">process_read_batch</span>
<span class="bp">self</span><span class="o">.</span><span class="n">service_metric</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_table_metric</span><span class="p">(</span><span class="n">table_id</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;ReadOperation is improperly configure: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="nb">str</span><span class="p">(</span><span class="n">element</span><span class="p">))</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">for</span> <span class="n">row</span> <span class="ow">in</span> <span class="n">read_action</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="s1">&#39;partitions&#39;</span><span class="p">]):</span>
<span class="k">yield</span> <span class="n">row</span>
<span class="bp">self</span><span class="o">.</span><span class="n">service_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="s1">&#39;ok&#39;</span><span class="p">)</span>
<span class="k">except</span> <span class="p">(</span><span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span><span class="p">)</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">service_metric</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">code</span><span class="o">.</span><span class="n">value</span><span class="p">))</span>
<span class="k">raise</span>
<span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">service_metric</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">))</span>
<span class="k">raise</span>
<span class="k">def</span> <span class="nf">teardown</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<div class="viewcode-block" id="ReadFromSpanner"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.ReadFromSpanner">[docs]</a><span class="nd">@experimental</span><span class="p">(</span><span class="n">extra_message</span><span class="o">=</span><span class="s2">&quot;No backwards-compatibility guarantees.&quot;</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">ReadFromSpanner</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A PTransform to perform reads from cloud spanner.</span>
<span class="sd"> ReadFromSpanner uses BatchAPI to perform all read operations.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">project_id</span><span class="p">,</span> <span class="n">instance_id</span><span class="p">,</span> <span class="n">database_id</span><span class="p">,</span> <span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">read_timestamp</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">exact_staleness</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">credentials</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">sql</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">params</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">param_types</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># with_query</span>
<span class="n">table</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">query_name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">index</span><span class="o">=</span><span class="s2">&quot;&quot;</span><span class="p">,</span>
<span class="n">keyset</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># with_table</span>
<span class="n">read_operations</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># for read all</span>
<span class="n">transaction</span><span class="o">=</span><span class="kc">None</span>
<span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A PTransform that uses Spanner Batch API to perform reads.</span>
<span class="sd"> Args:</span>
<span class="sd"> project_id: Cloud spanner project id. Be sure to use the Project ID,</span>
<span class="sd"> not the Project Number.</span>
<span class="sd"> instance_id: Cloud spanner instance id.</span>
<span class="sd"> database_id: Cloud spanner database id.</span>
<span class="sd"> pool: (optional) session pool to be used by database. If not passed,</span>
<span class="sd"> Spanner Cloud SDK uses the BurstyPool by default.</span>
<span class="sd"> `google.cloud.spanner.BurstyPool`. Ref:</span>
<span class="sd"> https://googleapis.dev/python/spanner/latest/database-api.html?#google.</span>
<span class="sd"> cloud.spanner_v1.database.Database</span>
<span class="sd"> read_timestamp: (optional) An instance of the `datetime.datetime` object</span>
<span class="sd"> to execute all reads at the given timestamp. By default, set to `None`.</span>
<span class="sd"> exact_staleness: (optional) An instance of the `datetime.timedelta`</span>
<span class="sd"> object. These timestamp bounds execute reads at a user-specified</span>
<span class="sd"> timestamp. By default, set to `None`.</span>
<span class="sd"> credentials: (optional) The authorization credentials to attach to</span>
<span class="sd"> requests. These credentials identify this application to the service.</span>
<span class="sd"> If none are specified, the client will attempt to ascertain</span>
<span class="sd"> the credentials from the environment. By default, set to `None`.</span>
<span class="sd"> sql: (optional) SQL query statement.</span>
<span class="sd"> params: (optional) Values for parameter replacement. Keys must match the</span>
<span class="sd"> names used in sql. By default, set to `None`.</span>
<span class="sd"> param_types: (optional) maps explicit types for one or more param values;</span>
<span class="sd"> required if params are passed. By default, set to `None`.</span>
<span class="sd"> table: (optional) Name of the table from which to fetch data. By</span>
<span class="sd"> default, set to `None`.</span>
<span class="sd"> columns: (optional) List of names of columns to be retrieved; required if</span>
<span class="sd"> the table is passed. By default, set to `None`.</span>
<span class="sd"> index: (optional) name of index to use, rather than the table&#39;s primary</span>
<span class="sd"> key. By default, set to `None`.</span>
<span class="sd"> keyset: (optional) keys / ranges identifying rows to be retrieved. By</span>
<span class="sd"> default, set to `None`.</span>
<span class="sd"> read_operations: (optional) List of the objects of :class:`ReadOperation`</span>
<span class="sd"> to perform read all. By default, set to `None`.</span>
<span class="sd"> transaction: (optional) PTransform of the :meth:`create_transaction` to</span>
<span class="sd"> perform naive read on cloud spanner. By default, set to `None`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_configuration</span> <span class="o">=</span> <span class="n">_BeamSpannerConfiguration</span><span class="p">(</span>
<span class="n">project</span><span class="o">=</span><span class="n">project_id</span><span class="p">,</span>
<span class="n">instance</span><span class="o">=</span><span class="n">instance_id</span><span class="p">,</span>
<span class="n">database</span><span class="o">=</span><span class="n">database_id</span><span class="p">,</span>
<span class="n">table</span><span class="o">=</span><span class="n">table</span><span class="p">,</span>
<span class="n">query_name</span><span class="o">=</span><span class="n">query_name</span><span class="p">,</span>
<span class="n">credentials</span><span class="o">=</span><span class="n">credentials</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span>
<span class="n">snapshot_read_timestamp</span><span class="o">=</span><span class="n">read_timestamp</span><span class="p">,</span>
<span class="n">snapshot_exact_staleness</span><span class="o">=</span><span class="n">exact_staleness</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="o">=</span> <span class="n">read_operations</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_transaction</span> <span class="o">=</span> <span class="n">transaction</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="n">table</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="n">columns</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Columns are required with the table name.&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="o">=</span> <span class="p">[</span>
<span class="n">ReadOperation</span><span class="o">.</span><span class="n">table</span><span class="p">(</span>
<span class="n">table</span><span class="o">=</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="n">columns</span><span class="p">,</span> <span class="n">index</span><span class="o">=</span><span class="n">index</span><span class="p">,</span> <span class="n">keyset</span><span class="o">=</span><span class="n">keyset</span><span class="p">)</span>
<span class="p">]</span>
<span class="k">elif</span> <span class="n">sql</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="o">=</span> <span class="p">[</span>
<span class="n">ReadOperation</span><span class="o">.</span><span class="n">query</span><span class="p">(</span>
<span class="n">sql</span><span class="o">=</span><span class="n">sql</span><span class="p">,</span> <span class="n">params</span><span class="o">=</span><span class="n">params</span><span class="p">,</span> <span class="n">param_types</span><span class="o">=</span><span class="n">param_types</span><span class="p">)</span>
<span class="p">]</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pbegin</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pbegin</span><span class="p">,</span> <span class="n">PBegin</span><span class="p">):</span>
<span class="n">pcoll</span> <span class="o">=</span> <span class="n">pbegin</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">|</span> <span class="n">Create</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span><span class="p">)</span>
<span class="k">elif</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pbegin</span><span class="p">,</span> <span class="n">PBegin</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;Read operation in the constructor only works with &quot;</span>
<span class="s2">&quot;the root of the pipeline.&quot;</span><span class="p">)</span>
<span class="n">pcoll</span> <span class="o">=</span> <span class="n">pbegin</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;Spanner required read operation, sql or table &quot;</span>
<span class="s2">&quot;with columns.&quot;</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_transaction</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># reading as batch read using the spanner partitioning query to create</span>
<span class="c1"># batches.</span>
<span class="n">p</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">pcoll</span>
<span class="o">|</span> <span class="s1">&#39;Generate Partitions&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ParDo</span><span class="p">(</span>
<span class="n">_CreateReadPartitions</span><span class="p">(</span><span class="n">spanner_configuration</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_configuration</span><span class="p">))</span>
<span class="o">|</span> <span class="s1">&#39;Reshuffle&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">Reshuffle</span><span class="p">()</span>
<span class="o">|</span> <span class="s1">&#39;Read From Partitions&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ParDo</span><span class="p">(</span>
<span class="n">_ReadFromPartitionFn</span><span class="p">(</span><span class="n">spanner_configuration</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_configuration</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># reading as naive read, in which we don&#39;t make batches and execute the</span>
<span class="c1"># queries as a single read.</span>
<span class="n">p</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">pcoll</span>
<span class="o">|</span> <span class="s1">&#39;Reshuffle&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">Reshuffle</span><span class="p">()</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">ReadOperation</span><span class="p">)</span>
<span class="o">|</span> <span class="s1">&#39;Perform Read&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ParDo</span><span class="p">(</span>
<span class="n">_NaiveSpannerReadDoFn</span><span class="p">(</span><span class="n">spanner_configuration</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_configuration</span><span class="p">),</span>
<span class="n">AsSingleton</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_transaction</span><span class="p">)))</span>
<span class="k">return</span> <span class="n">p</span>
<span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">res</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">sql</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">table</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">for</span> <span class="n">ro</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span><span class="p">:</span>
<span class="k">if</span> <span class="n">ro</span><span class="o">.</span><span class="n">is_sql</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span>
<span class="n">sql</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">ro</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">ro</span><span class="o">.</span><span class="n">is_table</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span>
<span class="n">table</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">ro</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">if</span> <span class="n">sql</span><span class="p">:</span>
<span class="n">res</span><span class="p">[</span><span class="s1">&#39;sql&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">sql</span><span class="p">),</span> <span class="n">label</span><span class="o">=</span><span class="s1">&#39;Sql&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">table</span><span class="p">:</span>
<span class="n">res</span><span class="p">[</span><span class="s1">&#39;table&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">table</span><span class="p">),</span> <span class="n">label</span><span class="o">=</span><span class="s1">&#39;Table&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_transaction</span><span class="p">:</span>
<span class="n">res</span><span class="p">[</span><span class="s1">&#39;transaction&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">DisplayDataItem</span><span class="p">(</span>
<span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_transaction</span><span class="p">),</span> <span class="n">label</span><span class="o">=</span><span class="s1">&#39;transaction&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="n">res</span></div>
<div class="viewcode-block" id="WriteToSpanner"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteToSpanner">[docs]</a><span class="nd">@experimental</span><span class="p">(</span><span class="n">extra_message</span><span class="o">=</span><span class="s2">&quot;No backwards-compatibility guarantees.&quot;</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">WriteToSpanner</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">project_id</span><span class="p">,</span>
<span class="n">instance_id</span><span class="p">,</span>
<span class="n">database_id</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">credentials</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">max_batch_size_bytes</span><span class="o">=</span><span class="mi">1048576</span><span class="p">,</span>
<span class="n">max_number_rows</span><span class="o">=</span><span class="mi">50</span><span class="p">,</span>
<span class="n">max_number_cells</span><span class="o">=</span><span class="mi">500</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A PTransform to write onto Google Cloud Spanner.</span>
<span class="sd"> Args:</span>
<span class="sd"> project_id: Cloud spanner project id. Be sure to use the Project ID,</span>
<span class="sd"> not the Project Number.</span>
<span class="sd"> instance_id: Cloud spanner instance id.</span>
<span class="sd"> database_id: Cloud spanner database id.</span>
<span class="sd"> max_batch_size_bytes: (optional) Split the mutations into batches to</span>
<span class="sd"> reduce the number of transaction sent to Spanner. By default it is</span>
<span class="sd"> set to 1 MB (1048576 Bytes).</span>
<span class="sd"> max_number_rows: (optional) Split the mutations into batches to</span>
<span class="sd"> reduce the number of transaction sent to Spanner. By default it is</span>
<span class="sd"> set to 50 rows per batch.</span>
<span class="sd"> max_number_cells: (optional) Split the mutations into batches to</span>
<span class="sd"> reduce the number of transaction sent to Spanner. By default it is</span>
<span class="sd"> set to 500 cells per batch.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_configuration</span> <span class="o">=</span> <span class="n">_BeamSpannerConfiguration</span><span class="p">(</span>
<span class="n">project</span><span class="o">=</span><span class="n">project_id</span><span class="p">,</span>
<span class="n">instance</span><span class="o">=</span><span class="n">instance_id</span><span class="p">,</span>
<span class="n">database</span><span class="o">=</span><span class="n">database_id</span><span class="p">,</span>
<span class="n">table</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">query_name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">credentials</span><span class="o">=</span><span class="n">credentials</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span>
<span class="n">snapshot_read_timestamp</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">snapshot_exact_staleness</span><span class="o">=</span><span class="kc">None</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span> <span class="o">=</span> <span class="n">max_batch_size_bytes</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span> <span class="o">=</span> <span class="n">max_number_rows</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span> <span class="o">=</span> <span class="n">max_number_cells</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_database_id</span> <span class="o">=</span> <span class="n">database_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_project_id</span> <span class="o">=</span> <span class="n">project_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_instance_id</span> <span class="o">=</span> <span class="n">instance_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pool</span> <span class="o">=</span> <span class="n">pool</span>
<span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">res</span> <span class="o">=</span> <span class="p">{</span>
<span class="s1">&#39;project_id&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_project_id</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s1">&#39;Project Id&#39;</span><span class="p">),</span>
<span class="s1">&#39;instance_id&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_instance_id</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s1">&#39;Instance Id&#39;</span><span class="p">),</span>
<span class="s1">&#39;pool&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_pool</span><span class="p">),</span> <span class="n">label</span><span class="o">=</span><span class="s1">&#39;Pool&#39;</span><span class="p">),</span>
<span class="s1">&#39;database&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_database_id</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s1">&#39;Database&#39;</span><span class="p">),</span>
<span class="s1">&#39;batch_size&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s2">&quot;Batch Size&quot;</span><span class="p">),</span>
<span class="s1">&#39;max_number_rows&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s2">&quot;Max Rows&quot;</span><span class="p">),</span>
<span class="s1">&#39;max_number_cells&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s2">&quot;Max Cells&quot;</span><span class="p">),</span>
<span class="p">}</span>
<span class="k">return</span> <span class="n">res</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">pcoll</span>
<span class="o">|</span> <span class="s2">&quot;make batches&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">_WriteGroup</span><span class="p">(</span>
<span class="n">max_batch_size_bytes</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span><span class="p">,</span>
<span class="n">max_number_rows</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span><span class="p">,</span>
<span class="n">max_number_cells</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span><span class="p">)</span>
<span class="o">|</span>
<span class="s1">&#39;Writing to spanner&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ParDo</span><span class="p">(</span><span class="n">_WriteToSpannerDoFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_configuration</span><span class="p">)))</span></div>
<span class="k">class</span> <span class="nc">_Mutator</span><span class="p">(</span><span class="n">namedtuple</span><span class="p">(</span><span class="s1">&#39;_Mutator&#39;</span><span class="p">,</span>
<span class="p">[</span><span class="s2">&quot;mutation&quot;</span><span class="p">,</span> <span class="s2">&quot;operation&quot;</span><span class="p">,</span> <span class="s2">&quot;kwargs&quot;</span><span class="p">,</span> <span class="s2">&quot;rows&quot;</span><span class="p">,</span> <span class="s2">&quot;cells&quot;</span><span class="p">])</span>
<span class="p">):</span>
<span class="vm">__slots__</span> <span class="o">=</span> <span class="p">()</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">byte_size</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">mutation</span><span class="o">.</span><span class="n">ByteSize</span><span class="p">()</span>
<div class="viewcode-block" id="MutationGroup"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.MutationGroup">[docs]</a><span class="k">class</span> <span class="nc">MutationGroup</span><span class="p">(</span><span class="n">deque</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A Bundle of Spanner Mutations (_Mutator).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">info</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">cells</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">rows</span> <span class="o">=</span> <span class="mi">0</span>
<span class="nb">bytes</span> <span class="o">=</span> <span class="mi">0</span>
<span class="k">for</span> <span class="n">m</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="fm">__iter__</span><span class="p">():</span>
<span class="nb">bytes</span> <span class="o">+=</span> <span class="n">m</span><span class="o">.</span><span class="n">byte_size</span>
<span class="n">rows</span> <span class="o">+=</span> <span class="n">m</span><span class="o">.</span><span class="n">rows</span>
<span class="n">cells</span> <span class="o">+=</span> <span class="n">m</span><span class="o">.</span><span class="n">cells</span>
<span class="k">return</span> <span class="p">{</span><span class="s2">&quot;rows&quot;</span><span class="p">:</span> <span class="n">rows</span><span class="p">,</span> <span class="s2">&quot;cells&quot;</span><span class="p">:</span> <span class="n">cells</span><span class="p">,</span> <span class="s2">&quot;byte_size&quot;</span><span class="p">:</span> <span class="nb">bytes</span><span class="p">}</span>
<div class="viewcode-block" id="MutationGroup.primary"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.MutationGroup.primary">[docs]</a> <span class="k">def</span> <span class="nf">primary</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">next</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="fm">__iter__</span><span class="p">())</span></div></div>
<div class="viewcode-block" id="WriteMutation"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteMutation">[docs]</a><span class="k">class</span> <span class="nc">WriteMutation</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="n">_OPERATION_DELETE</span> <span class="o">=</span> <span class="s2">&quot;delete&quot;</span>
<span class="n">_OPERATION_INSERT</span> <span class="o">=</span> <span class="s2">&quot;insert&quot;</span>
<span class="n">_OPERATION_INSERT_OR_UPDATE</span> <span class="o">=</span> <span class="s2">&quot;insert_or_update&quot;</span>
<span class="n">_OPERATION_REPLACE</span> <span class="o">=</span> <span class="s2">&quot;replace&quot;</span>
<span class="n">_OPERATION_UPDATE</span> <span class="o">=</span> <span class="s2">&quot;update&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">insert</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">update</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">insert_or_update</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">replace</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">delete</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">columns</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">values</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">keyset</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A convenient class to create Spanner Mutations for Write. User can provide</span>
<span class="sd"> the operation via constructor or via static methods.</span>
<span class="sd"> Note: If a user passing the operation via construction, make sure that it</span>
<span class="sd"> will only accept one operation at a time. For example, if a user passing</span>
<span class="sd"> a table name in the `insert` parameter, and he also passes the `update`</span>
<span class="sd"> parameter value, this will cause an error.</span>
<span class="sd"> Args:</span>
<span class="sd"> insert: (Optional) Name of the table in which rows will be inserted.</span>
<span class="sd"> update: (Optional) Name of the table in which existing rows will be</span>
<span class="sd"> updated.</span>
<span class="sd"> insert_or_update: (Optional) Table name in which rows will be written.</span>
<span class="sd"> Like insert, except that if the row already exists, then its column</span>
<span class="sd"> values are overwritten with the ones provided. Any column values not</span>
<span class="sd"> explicitly written are preserved.</span>
<span class="sd"> replace: (Optional) Table name in which rows will be replaced. Like</span>
<span class="sd"> insert, except that if the row already exists, it is deleted, and the</span>
<span class="sd"> column values provided are inserted instead. Unlike `insert_or_update`,</span>
<span class="sd"> this means any values not explicitly written become `NULL`.</span>
<span class="sd"> delete: (Optional) Table name from which rows will be deleted. Succeeds</span>
<span class="sd"> whether or not the named rows were present.</span>
<span class="sd"> columns: The names of the columns in table to be written. The list of</span>
<span class="sd"> columns must contain enough columns to allow Cloud Spanner to derive</span>
<span class="sd"> values for all primary key columns in the row(s) to be modified.</span>
<span class="sd"> values: The values to be written. `values` can contain more than one</span>
<span class="sd"> list of values. If it does, then multiple rows are written, one for</span>
<span class="sd"> each entry in `values`. Each list in `values` must have exactly as</span>
<span class="sd"> many entries as there are entries in columns above. Sending multiple</span>
<span class="sd"> lists is equivalent to sending multiple Mutations, each containing one</span>
<span class="sd"> `values` entry and repeating table and columns.</span>
<span class="sd"> keyset: (Optional) The primary keys of the rows within table to delete.</span>
<span class="sd"> Delete is idempotent. The transaction will succeed even if some or</span>
<span class="sd"> all rows do not exist.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_columns</span> <span class="o">=</span> <span class="n">columns</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_values</span> <span class="o">=</span> <span class="n">values</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_keyset</span> <span class="o">=</span> <span class="n">keyset</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_insert</span> <span class="o">=</span> <span class="n">insert</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_update</span> <span class="o">=</span> <span class="n">update</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_insert_or_update</span> <span class="o">=</span> <span class="n">insert_or_update</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_replace</span> <span class="o">=</span> <span class="n">replace</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_delete</span> <span class="o">=</span> <span class="n">delete</span>
<span class="k">if</span> <span class="nb">sum</span><span class="p">([</span><span class="mi">1</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">_insert</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_update</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_insert_or_update</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_replace</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_delete</span><span class="p">]</span> <span class="k">if</span> <span class="n">x</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">])</span> <span class="o">!=</span> <span class="mi">1</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;No or more than one write mutation operation &quot;</span>
<span class="s2">&quot;provided: &lt;</span><span class="si">%s</span><span class="s2">: </span><span class="si">%s</span><span class="s2">&gt;&quot;</span> <span class="o">%</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">)))</span>
<span class="k">def</span> <span class="fm">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_insert</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">insert</span><span class="p">(</span>
<span class="n">table</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_insert</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_columns</span><span class="p">,</span> <span class="n">values</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_values</span><span class="p">)</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_update</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">update</span><span class="p">(</span>
<span class="n">table</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_update</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_columns</span><span class="p">,</span> <span class="n">values</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_values</span><span class="p">)</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_insert_or_update</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">insert_or_update</span><span class="p">(</span>
<span class="n">table</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_insert_or_update</span><span class="p">,</span>
<span class="n">columns</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_columns</span><span class="p">,</span>
<span class="n">values</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_values</span><span class="p">)</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_replace</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span>
<span class="n">table</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_replace</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_columns</span><span class="p">,</span> <span class="n">values</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_values</span><span class="p">)</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_delete</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">delete</span><span class="p">(</span><span class="n">table</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_delete</span><span class="p">,</span> <span class="n">keyset</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_keyset</span><span class="p">)</span>
<div class="viewcode-block" id="WriteMutation.insert"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteMutation.insert">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">insert</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Insert one or more new table rows.</span>
<span class="sd"> Args:</span>
<span class="sd"> table: Name of the table to be modified.</span>
<span class="sd"> columns: Name of the table columns to be modified.</span>
<span class="sd"> values: Values to be modified.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">rows</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span>
<span class="n">cells</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">columns</span><span class="p">)</span> <span class="o">*</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_Mutator</span><span class="p">(</span>
<span class="n">mutation</span><span class="o">=</span><span class="n">Mutation</span><span class="p">(</span><span class="n">insert</span><span class="o">=</span><span class="n">batch</span><span class="o">.</span><span class="n">_make_write_pb</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">)),</span>
<span class="n">operation</span><span class="o">=</span><span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_INSERT</span><span class="p">,</span>
<span class="n">rows</span><span class="o">=</span><span class="n">rows</span><span class="p">,</span>
<span class="n">cells</span><span class="o">=</span><span class="n">cells</span><span class="p">,</span>
<span class="n">kwargs</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;table&quot;</span><span class="p">:</span> <span class="n">table</span><span class="p">,</span> <span class="s2">&quot;columns&quot;</span><span class="p">:</span> <span class="n">columns</span><span class="p">,</span> <span class="s2">&quot;values&quot;</span><span class="p">:</span> <span class="n">values</span>
<span class="p">})</span></div>
<div class="viewcode-block" id="WriteMutation.update"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteMutation.update">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">update</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Update one or more existing table rows.</span>
<span class="sd"> Args:</span>
<span class="sd"> table: Name of the table to be modified.</span>
<span class="sd"> columns: Name of the table columns to be modified.</span>
<span class="sd"> values: Values to be modified.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">rows</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span>
<span class="n">cells</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">columns</span><span class="p">)</span> <span class="o">*</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_Mutator</span><span class="p">(</span>
<span class="n">mutation</span><span class="o">=</span><span class="n">Mutation</span><span class="p">(</span><span class="n">update</span><span class="o">=</span><span class="n">batch</span><span class="o">.</span><span class="n">_make_write_pb</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">)),</span>
<span class="n">operation</span><span class="o">=</span><span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_UPDATE</span><span class="p">,</span>
<span class="n">rows</span><span class="o">=</span><span class="n">rows</span><span class="p">,</span>
<span class="n">cells</span><span class="o">=</span><span class="n">cells</span><span class="p">,</span>
<span class="n">kwargs</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;table&quot;</span><span class="p">:</span> <span class="n">table</span><span class="p">,</span> <span class="s2">&quot;columns&quot;</span><span class="p">:</span> <span class="n">columns</span><span class="p">,</span> <span class="s2">&quot;values&quot;</span><span class="p">:</span> <span class="n">values</span>
<span class="p">})</span></div>
<div class="viewcode-block" id="WriteMutation.insert_or_update"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteMutation.insert_or_update">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">insert_or_update</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Insert/update one or more table rows.</span>
<span class="sd"> Args:</span>
<span class="sd"> table: Name of the table to be modified.</span>
<span class="sd"> columns: Name of the table columns to be modified.</span>
<span class="sd"> values: Values to be modified.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">rows</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span>
<span class="n">cells</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">columns</span><span class="p">)</span> <span class="o">*</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_Mutator</span><span class="p">(</span>
<span class="n">mutation</span><span class="o">=</span><span class="n">Mutation</span><span class="p">(</span>
<span class="n">insert_or_update</span><span class="o">=</span><span class="n">batch</span><span class="o">.</span><span class="n">_make_write_pb</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">)),</span>
<span class="n">operation</span><span class="o">=</span><span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_INSERT_OR_UPDATE</span><span class="p">,</span>
<span class="n">rows</span><span class="o">=</span><span class="n">rows</span><span class="p">,</span>
<span class="n">cells</span><span class="o">=</span><span class="n">cells</span><span class="p">,</span>
<span class="n">kwargs</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;table&quot;</span><span class="p">:</span> <span class="n">table</span><span class="p">,</span> <span class="s2">&quot;columns&quot;</span><span class="p">:</span> <span class="n">columns</span><span class="p">,</span> <span class="s2">&quot;values&quot;</span><span class="p">:</span> <span class="n">values</span>
<span class="p">})</span></div>
<div class="viewcode-block" id="WriteMutation.replace"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteMutation.replace">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">replace</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Replace one or more table rows.</span>
<span class="sd"> Args:</span>
<span class="sd"> table: Name of the table to be modified.</span>
<span class="sd"> columns: Name of the table columns to be modified.</span>
<span class="sd"> values: Values to be modified.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">rows</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span>
<span class="n">cells</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">columns</span><span class="p">)</span> <span class="o">*</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_Mutator</span><span class="p">(</span>
<span class="n">mutation</span><span class="o">=</span><span class="n">Mutation</span><span class="p">(</span><span class="n">replace</span><span class="o">=</span><span class="n">batch</span><span class="o">.</span><span class="n">_make_write_pb</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">)),</span>
<span class="n">operation</span><span class="o">=</span><span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_REPLACE</span><span class="p">,</span>
<span class="n">rows</span><span class="o">=</span><span class="n">rows</span><span class="p">,</span>
<span class="n">cells</span><span class="o">=</span><span class="n">cells</span><span class="p">,</span>
<span class="n">kwargs</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;table&quot;</span><span class="p">:</span> <span class="n">table</span><span class="p">,</span> <span class="s2">&quot;columns&quot;</span><span class="p">:</span> <span class="n">columns</span><span class="p">,</span> <span class="s2">&quot;values&quot;</span><span class="p">:</span> <span class="n">values</span>
<span class="p">})</span></div>
<div class="viewcode-block" id="WriteMutation.delete"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteMutation.delete">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">delete</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">keyset</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Delete one or more table rows.</span>
<span class="sd"> Args:</span>
<span class="sd"> table: Name of the table to be modified.</span>
<span class="sd"> keyset: Keys/ranges identifying rows to delete.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">delete</span> <span class="o">=</span> <span class="n">Mutation</span><span class="o">.</span><span class="n">Delete</span><span class="p">(</span><span class="n">table</span><span class="o">=</span><span class="n">table</span><span class="p">,</span> <span class="n">key_set</span><span class="o">=</span><span class="n">keyset</span><span class="o">.</span><span class="n">_to_pb</span><span class="p">())</span>
<span class="k">return</span> <span class="n">_Mutator</span><span class="p">(</span>
<span class="n">mutation</span><span class="o">=</span><span class="n">Mutation</span><span class="p">(</span><span class="n">delete</span><span class="o">=</span><span class="n">delete</span><span class="p">),</span>
<span class="n">rows</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="n">cells</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="n">operation</span><span class="o">=</span><span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_DELETE</span><span class="p">,</span>
<span class="n">kwargs</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;table&quot;</span><span class="p">:</span> <span class="n">table</span><span class="p">,</span> <span class="s2">&quot;keyset&quot;</span><span class="p">:</span> <span class="n">keyset</span>
<span class="p">})</span></div></div>
<span class="nd">@with_input_types</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">Union</span><span class="p">[</span><span class="n">MutationGroup</span><span class="p">,</span> <span class="n">TaggedOutput</span><span class="p">])</span>
<span class="nd">@with_output_types</span><span class="p">(</span><span class="n">MutationGroup</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_BatchFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Batches mutations together.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">max_batch_size_bytes</span><span class="p">,</span> <span class="n">max_number_rows</span><span class="p">,</span> <span class="n">max_number_cells</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span> <span class="o">=</span> <span class="n">max_batch_size_bytes</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span> <span class="o">=</span> <span class="n">max_number_rows</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span> <span class="o">=</span> <span class="n">max_number_cells</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="o">=</span> <span class="n">MutationGroup</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_size_in_bytes</span> <span class="o">=</span> <span class="mi">0</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_rows</span> <span class="o">=</span> <span class="mi">0</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_cells</span> <span class="o">=</span> <span class="mi">0</span>
<span class="k">def</span> <span class="nf">_reset_count</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="o">=</span> <span class="n">MutationGroup</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_size_in_bytes</span> <span class="o">=</span> <span class="mi">0</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_rows</span> <span class="o">=</span> <span class="mi">0</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_cells</span> <span class="o">=</span> <span class="mi">0</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">mg_info</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">info</span>
<span class="k">if</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">&#39;byte_size&#39;</span><span class="p">]</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">_size_in_bytes</span> <span class="o">&gt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span> \
<span class="ow">or</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">&#39;cells&#39;</span><span class="p">]</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cells</span> <span class="o">&gt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span> \
<span class="ow">or</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">&#39;rows&#39;</span><span class="p">]</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">_rows</span> <span class="o">&gt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span><span class="p">:</span>
<span class="c1"># Batch is full, output the batch and resetting the count.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="p">:</span>
<span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_reset_count</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="c1"># total byte size of the mutation group.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_size_in_bytes</span> <span class="o">+=</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">&#39;byte_size&#39;</span><span class="p">]</span>
<span class="c1"># total rows in the mutation group.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_rows</span> <span class="o">+=</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">&#39;rows&#39;</span><span class="p">]</span>
<span class="c1"># total cells in the mutation group.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_cells</span> <span class="o">+=</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">&#39;cells&#39;</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="o">=</span> <span class="kc">None</span>
<span class="nd">@with_input_types</span><span class="p">(</span><span class="n">MutationGroup</span><span class="p">)</span>
<span class="nd">@with_output_types</span><span class="p">(</span><span class="n">MutationGroup</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_BatchableFilterFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Filters MutationGroups larger than the batch size to the output tagged with</span>
<span class="sd"> OUTPUT_TAG_UNBATCHABLE.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">OUTPUT_TAG_UNBATCHABLE</span> <span class="o">=</span> <span class="s1">&#39;unbatchable&#39;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">max_batch_size_bytes</span><span class="p">,</span> <span class="n">max_number_rows</span><span class="p">,</span> <span class="n">max_number_cells</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span> <span class="o">=</span> <span class="n">max_batch_size_bytes</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span> <span class="o">=</span> <span class="n">max_number_rows</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span> <span class="o">=</span> <span class="n">max_number_cells</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batchable</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_unbatchable</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">if</span> <span class="n">element</span><span class="o">.</span><span class="n">primary</span><span class="p">()</span><span class="o">.</span><span class="n">operation</span> <span class="o">==</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_DELETE</span><span class="p">:</span>
<span class="c1"># As delete mutations are not batchable.</span>
<span class="k">yield</span> <span class="n">TaggedOutput</span><span class="p">(</span><span class="n">_BatchableFilterFn</span><span class="o">.</span><span class="n">OUTPUT_TAG_UNBATCHABLE</span><span class="p">,</span> <span class="n">element</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">mg_info</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">info</span>
<span class="k">if</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">&#39;byte_size&#39;</span><span class="p">]</span> <span class="o">&gt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span> \
<span class="ow">or</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">&#39;cells&#39;</span><span class="p">]</span> <span class="o">&gt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span> \
<span class="ow">or</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">&#39;rows&#39;</span><span class="p">]</span> <span class="o">&gt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">TaggedOutput</span><span class="p">(</span><span class="n">_BatchableFilterFn</span><span class="o">.</span><span class="n">OUTPUT_TAG_UNBATCHABLE</span><span class="p">,</span> <span class="n">element</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">element</span>
<span class="k">class</span> <span class="nc">_WriteToSpannerDoFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">spanner_configuration</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span> <span class="o">=</span> <span class="n">spanner_configuration</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_db_instance</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">batches</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span> <span class="s1">&#39;SpannerBatches&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span> <span class="o">=</span> <span class="p">{</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SERVICE_LABEL</span><span class="p">:</span> <span class="s1">&#39;Spanner&#39;</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">METHOD_LABEL</span><span class="p">:</span> <span class="s1">&#39;Write&#39;</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_PROJECT_ID</span><span class="p">:</span> <span class="n">spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_DATABASE_ID</span><span class="p">:</span> <span class="n">spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">,</span>
<span class="p">}</span>
<span class="c1"># table_id to metrics</span>
<span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">def</span> <span class="nf">_register_table_metric</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">table_id</span><span class="p">):</span>
<span class="k">if</span> <span class="n">table_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span><span class="p">:</span>
<span class="k">return</span>
<span class="n">database_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span>
<span class="n">project_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span>
<span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">SpannerTable</span><span class="p">(</span>
<span class="n">project_id</span><span class="p">,</span> <span class="n">database_id</span><span class="p">,</span> <span class="n">table_id</span><span class="p">)</span>
<span class="n">labels</span> <span class="o">=</span> <span class="p">{</span>
<span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_TABLE_ID</span><span class="p">:</span> <span class="n">table_id</span>
<span class="p">}</span>
<span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span>
<span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span>
<span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span><span class="p">[</span><span class="n">table_id</span><span class="p">]</span> <span class="o">=</span> <span class="n">service_call_metric</span>
<span class="k">def</span> <span class="nf">setup</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">spanner_client</span> <span class="o">=</span> <span class="n">Client</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">)</span>
<span class="n">instance</span> <span class="o">=</span> <span class="n">spanner_client</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">instance</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_db_instance</span> <span class="o">=</span> <span class="n">instance</span><span class="o">.</span><span class="n">database</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">pool</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">batches</span><span class="o">.</span><span class="n">inc</span><span class="p">()</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">_db_instance</span><span class="o">.</span><span class="n">batch</span><span class="p">()</span> <span class="k">as</span> <span class="n">b</span><span class="p">:</span>
<span class="k">for</span> <span class="n">m</span> <span class="ow">in</span> <span class="n">element</span><span class="p">:</span>
<span class="n">table_id</span> <span class="o">=</span> <span class="n">m</span><span class="o">.</span><span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;table&#39;</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_register_table_metric</span><span class="p">(</span><span class="n">table_id</span><span class="p">)</span>
<span class="k">if</span> <span class="n">m</span><span class="o">.</span><span class="n">operation</span> <span class="o">==</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_DELETE</span><span class="p">:</span>
<span class="n">batch_func</span> <span class="o">=</span> <span class="n">b</span><span class="o">.</span><span class="n">delete</span>
<span class="k">elif</span> <span class="n">m</span><span class="o">.</span><span class="n">operation</span> <span class="o">==</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_REPLACE</span><span class="p">:</span>
<span class="n">batch_func</span> <span class="o">=</span> <span class="n">b</span><span class="o">.</span><span class="n">replace</span>
<span class="k">elif</span> <span class="n">m</span><span class="o">.</span><span class="n">operation</span> <span class="o">==</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_INSERT_OR_UPDATE</span><span class="p">:</span>
<span class="n">batch_func</span> <span class="o">=</span> <span class="n">b</span><span class="o">.</span><span class="n">insert_or_update</span>
<span class="k">elif</span> <span class="n">m</span><span class="o">.</span><span class="n">operation</span> <span class="o">==</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_INSERT</span><span class="p">:</span>
<span class="n">batch_func</span> <span class="o">=</span> <span class="n">b</span><span class="o">.</span><span class="n">insert</span>
<span class="k">elif</span> <span class="n">m</span><span class="o">.</span><span class="n">operation</span> <span class="o">==</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_UPDATE</span><span class="p">:</span>
<span class="n">batch_func</span> <span class="o">=</span> <span class="n">b</span><span class="o">.</span><span class="n">update</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Unknown operation action: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">m</span><span class="o">.</span><span class="n">operation</span><span class="p">)</span>
<span class="n">batch_func</span><span class="p">(</span><span class="o">**</span><span class="n">m</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">except</span> <span class="p">(</span><span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span><span class="p">)</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="k">for</span> <span class="n">service_metric</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="n">service_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">code</span><span class="o">.</span><span class="n">value</span><span class="p">))</span>
<span class="k">raise</span>
<span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="k">for</span> <span class="n">service_metric</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="n">service_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">))</span>
<span class="k">raise</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">for</span> <span class="n">service_metric</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="n">service_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="s1">&#39;ok&#39;</span><span class="p">)</span>
<span class="nd">@with_input_types</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">Union</span><span class="p">[</span><span class="n">MutationGroup</span><span class="p">,</span> <span class="n">_Mutator</span><span class="p">])</span>
<span class="nd">@with_output_types</span><span class="p">(</span><span class="n">MutationGroup</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_MakeMutationGroupsFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Make Mutation group object if the element is the instance of _Mutator.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">MutationGroup</span><span class="p">):</span>
<span class="k">yield</span> <span class="n">element</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">_Mutator</span><span class="p">):</span>
<span class="k">yield</span> <span class="n">MutationGroup</span><span class="p">([</span><span class="n">element</span><span class="p">])</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;Invalid object type: </span><span class="si">%s</span><span class="s2">. Object must be an instance of &quot;</span>
<span class="s2">&quot;MutationGroup or WriteMutations&quot;</span> <span class="o">%</span> <span class="nb">str</span><span class="p">(</span><span class="n">element</span><span class="p">))</span>
<span class="k">class</span> <span class="nc">_WriteGroup</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">max_batch_size_bytes</span><span class="p">,</span> <span class="n">max_number_rows</span><span class="p">,</span> <span class="n">max_number_cells</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span> <span class="o">=</span> <span class="n">max_batch_size_bytes</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span> <span class="o">=</span> <span class="n">max_number_rows</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span> <span class="o">=</span> <span class="n">max_number_cells</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="n">filter_batchable_mutations</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">pcoll</span>
<span class="o">|</span> <span class="s1">&#39;Making mutation groups&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ParDo</span><span class="p">(</span><span class="n">_MakeMutationGroupsFn</span><span class="p">())</span>
<span class="o">|</span> <span class="s1">&#39;Filtering Batchable Mutations&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ParDo</span><span class="p">(</span>
<span class="n">_BatchableFilterFn</span><span class="p">(</span>
<span class="n">max_batch_size_bytes</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span><span class="p">,</span>
<span class="n">max_number_rows</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span><span class="p">,</span>
<span class="n">max_number_cells</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span><span class="p">))</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span>
<span class="n">_BatchableFilterFn</span><span class="o">.</span><span class="n">OUTPUT_TAG_UNBATCHABLE</span><span class="p">,</span> <span class="n">main</span><span class="o">=</span><span class="s1">&#39;batchable&#39;</span><span class="p">)</span>
<span class="p">)</span>
<span class="n">batching_batchables</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">filter_batchable_mutations</span><span class="p">[</span><span class="s1">&#39;batchable&#39;</span><span class="p">]</span>
<span class="o">|</span> <span class="n">ParDo</span><span class="p">(</span>
<span class="n">_BatchFn</span><span class="p">(</span>
<span class="n">max_batch_size_bytes</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span><span class="p">,</span>
<span class="n">max_number_rows</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span><span class="p">,</span>
<span class="n">max_number_cells</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span><span class="p">)))</span>
<span class="k">return</span> <span class="p">((</span>
<span class="n">batching_batchables</span><span class="p">,</span>
<span class="n">filter_batchable_mutations</span><span class="p">[</span><span class="n">_BatchableFilterFn</span><span class="o">.</span><span class="n">OUTPUT_TAG_UNBATCHABLE</span><span class="p">])</span>
<span class="o">|</span> <span class="s1">&#39;Merging batchable and unbatchable&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">Flatten</span><span class="p">())</span>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>