| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>apache_beam.io.gcp.experimental.spannerio — Apache Beam 2.38.0 documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../../../" src="../../../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../../../_static/language_data.js"></script> |
| <script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| |
| <script type="text/javascript" src="../../../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../../../_static/pygments.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../../../search.html" /> |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../../../index.html" class="icon icon-home"> Apache Beam |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 2.38.0 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.ml.html">apache_beam.ml package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../../../index.html">Apache Beam</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../../../index.html">Module code</a> »</li> |
| |
| <li>apache_beam.io.gcp.experimental.spannerio</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.io.gcp.experimental.spannerio</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="sd">"""Google Cloud Spanner IO</span> |
| |
| <span class="sd">Experimental; no backwards-compatibility guarantees.</span> |
| |
| <span class="sd">This is an experimental module for reading and writing data from Google Cloud</span> |
| <span class="sd">Spanner. Visit: https://cloud.google.com/spanner for more details.</span> |
| |
| <span class="sd">Reading Data from Cloud Spanner.</span> |
| |
| <span class="sd">To read from Cloud Spanner apply ReadFromSpanner transformation. It will</span> |
| <span class="sd">return a PCollection, where each element represents an individual row returned</span> |
| <span class="sd">from the read operation. Both Query and Read APIs are supported.</span> |
| |
| <span class="sd">ReadFromSpanner relies on the ReadOperation objects which is exposed by the</span> |
| <span class="sd">SpannerIO API. ReadOperation holds the immutable data which is responsible to</span> |
| <span class="sd">execute batch and naive reads on Cloud Spanner. This is done for more</span> |
| <span class="sd">convenient programming.</span> |
| |
| <span class="sd">ReadFromSpanner reads from Cloud Spanner by providing either an 'sql' param</span> |
| <span class="sd">in the constructor or 'table' name with 'columns' as list. For example:::</span> |
| |
| <span class="sd"> records = (pipeline</span> |
| <span class="sd"> | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,</span> |
| <span class="sd"> sql='Select * from users'))</span> |
| |
| <span class="sd"> records = (pipeline</span> |
| <span class="sd"> | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,</span> |
| <span class="sd"> table='users', columns=['id', 'name', 'email']))</span> |
| |
| <span class="sd">You can also perform multiple reads by providing a list of ReadOperations</span> |
| <span class="sd">to the ReadFromSpanner transform constructor. ReadOperation exposes two static</span> |
| <span class="sd">methods. Use 'query' to perform sql based reads, 'table' to perform read from</span> |
| <span class="sd">table name. For example:::</span> |
| |
| <span class="sd"> read_operations = [</span> |
| <span class="sd"> ReadOperation.table(table='customers', columns=['name',</span> |
| <span class="sd"> 'email']),</span> |
| <span class="sd"> ReadOperation.table(table='vendors', columns=['name',</span> |
| <span class="sd"> 'email']),</span> |
| <span class="sd"> ]</span> |
| <span class="sd"> all_users = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,</span> |
| <span class="sd"> read_operations=read_operations)</span> |
| |
| <span class="sd"> ...OR...</span> |
| |
| <span class="sd"> read_operations = [</span> |
| <span class="sd"> ReadOperation.query(sql='Select name, email from</span> |
| <span class="sd"> customers'),</span> |
| <span class="sd"> ReadOperation.query(</span> |
| <span class="sd"> sql='Select * from users where id <= @user_id',</span> |
| <span class="sd"> params={'user_id': 100},</span> |
| <span class="sd"> params_type={'user_id': param_types.INT64}</span> |
| <span class="sd"> ),</span> |
| <span class="sd"> ]</span> |
| <span class="sd"> # `params_types` are instance of `google.cloud.spanner.param_types`</span> |
| <span class="sd"> all_users = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,</span> |
| <span class="sd"> read_operations=read_operations)</span> |
| |
| <span class="sd">For more information, please review the docs on class ReadOperation.</span> |
| |
| <span class="sd">User can also able to provide the ReadOperation in form of PCollection via</span> |
| <span class="sd">pipeline. For example:::</span> |
| |
| <span class="sd"> users = (pipeline</span> |
| <span class="sd"> | beam.Create([ReadOperation...])</span> |
| <span class="sd"> | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME))</span> |
| |
| <span class="sd">User may also create cloud spanner transaction from the transform called</span> |
| <span class="sd">`create_transaction` which is available in the SpannerIO API.</span> |
| |
| <span class="sd">The transform is guaranteed to be executed on a consistent snapshot of data,</span> |
| <span class="sd">utilizing the power of read only transactions. Staleness of data can be</span> |
| <span class="sd">controlled by providing the `read_timestamp` or `exact_staleness` param values</span> |
| <span class="sd">in the constructor.</span> |
| |
| <span class="sd">This transform requires root of the pipeline (PBegin) and returns PTransform</span> |
| <span class="sd">which is passed later to the `ReadFromSpanner` constructor. `ReadFromSpanner`</span> |
| <span class="sd">pass this transaction PTransform as a singleton side input to the</span> |
| <span class="sd">`_NaiveSpannerReadDoFn` containing 'session_id' and 'transaction_id'.</span> |
| <span class="sd">For example:::</span> |
| |
| <span class="sd"> transaction = (pipeline | create_transaction(TEST_PROJECT_ID,</span> |
| <span class="sd"> TEST_INSTANCE_ID,</span> |
| <span class="sd"> DB_NAME))</span> |
| |
| <span class="sd"> users = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,</span> |
| <span class="sd"> sql='Select * from users', transaction=transaction)</span> |
| |
| <span class="sd"> tweets = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,</span> |
| <span class="sd"> sql='Select * from tweets', transaction=transaction)</span> |
| |
| <span class="sd">For further details of this transform, please review the docs on the</span> |
| <span class="sd">:meth:`create_transaction` method available in the SpannerIO API.</span> |
| |
| <span class="sd">ReadFromSpanner takes this transform in the constructor and pass this to the</span> |
| <span class="sd">read pipeline as the singleton side input.</span> |
| |
| <span class="sd">Writing Data to Cloud Spanner.</span> |
| |
| <span class="sd">The WriteToSpanner transform writes to Cloud Spanner by executing a</span> |
| <span class="sd">collection a input rows (WriteMutation). The mutations are grouped into</span> |
| <span class="sd">batches for efficiency.</span> |
| |
| <span class="sd">WriteToSpanner transform relies on the WriteMutation objects which is exposed</span> |
| <span class="sd">by the SpannerIO API. WriteMutation have five static methods (insert, update,</span> |
| <span class="sd">insert_or_update, replace, delete). These methods returns the instance of the</span> |
| <span class="sd">_Mutator object which contains the mutation type and the Spanner Mutation</span> |
| <span class="sd">object. For more details, review the docs of the class SpannerIO.WriteMutation.</span> |
| <span class="sd">For example:::</span> |
| |
| <span class="sd"> mutations = [</span> |
| <span class="sd"> WriteMutation.insert(table='user', columns=('name', 'email'),</span> |
| <span class="sd"> values=[('sara', 'sara@dev.com')])</span> |
| <span class="sd"> ]</span> |
| <span class="sd"> _ = (p</span> |
| <span class="sd"> | beam.Create(mutations)</span> |
| <span class="sd"> | WriteToSpanner(</span> |
| <span class="sd"> project_id=SPANNER_PROJECT_ID,</span> |
| <span class="sd"> instance_id=SPANNER_INSTANCE_ID,</span> |
| <span class="sd"> database_id=SPANNER_DATABASE_NAME)</span> |
| <span class="sd"> )</span> |
| |
| <span class="sd">You can also create WriteMutation via calling its constructor. For example:::</span> |
| |
| <span class="sd"> mutations = [</span> |
| <span class="sd"> WriteMutation(insert='users', columns=('name', 'email'),</span> |
| <span class="sd"> values=[('sara", 'sara@example.com')])</span> |
| <span class="sd"> ]</span> |
| |
| <span class="sd">For more information, review the docs available on WriteMutation class.</span> |
| |
| <span class="sd">WriteToSpanner transform also takes three batching parameters (max_number_rows,</span> |
| <span class="sd">max_number_cells and max_batch_size_bytes). By default, max_number_rows is set</span> |
| <span class="sd">to 50 rows, max_number_cells is set to 500 cells and max_batch_size_bytes is</span> |
| <span class="sd">set to 1MB (1048576 bytes). These parameter used to reduce the number of</span> |
| <span class="sd">transactions sent to spanner by grouping the mutation into batches. Setting</span> |
| <span class="sd">these param values either to smaller value or zero to disable batching.</span> |
| <span class="sd">Unlike the Java connector, this connector does not create batches of</span> |
| <span class="sd">transactions sorted by table and primary key.</span> |
| |
| <span class="sd">WriteToSpanner transforms starts with the grouping into batches. The first step</span> |
| <span class="sd">in this process is to make the mutation groups of the WriteMutation</span> |
| <span class="sd">objects and then filtering them into batchable and unbatchable mutation</span> |
| <span class="sd">groups. There are three batching parameters (max_number_cells, max_number_rows</span> |
| <span class="sd">& max_batch_size_bytes). We calculated th mutation byte size from the method</span> |
| <span class="sd">available in the `google.cloud.spanner_v1.proto.mutation_pb2.Mutation.ByteSize`.</span> |
| <span class="sd">if the mutation rows, cells or byte size are larger than value of the any</span> |
| <span class="sd">batching parameters param, it will be tagged as "unbatchable" mutation. After</span> |
| <span class="sd">this all the batchable mutation are merged into a single mutation group whos</span> |
| <span class="sd">size is not larger than the "max_batch_size_bytes", after this process, all the</span> |
| <span class="sd">mutation groups together to process. If the Mutation references a table or</span> |
| <span class="sd">column does not exits, it will cause a exception and fails the entire pipeline.</span> |
| <span class="sd">"""</span> |
| <span class="kn">import</span> <span class="nn">typing</span> |
| <span class="kn">from</span> <span class="nn">collections</span> <span class="kn">import</span> <span class="n">deque</span> |
| <span class="kn">from</span> <span class="nn">collections</span> <span class="kn">import</span> <span class="n">namedtuple</span> |
| |
| <span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">Create</span> |
| <span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">DoFn</span> |
| <span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">Flatten</span> |
| <span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">ParDo</span> |
| <span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">Reshuffle</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.internal.metrics.metric</span> <span class="kn">import</span> <span class="n">ServiceCallMetric</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp</span> <span class="kn">import</span> <span class="n">resource_identifiers</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.metrics</span> <span class="kn">import</span> <span class="n">Metrics</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.metrics</span> <span class="kn">import</span> <span class="n">monitoring_infos</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="kn">import</span> <span class="n">AsSingleton</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="kn">import</span> <span class="n">PBegin</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="kn">import</span> <span class="n">TaggedOutput</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">PTransform</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">ptransform_fn</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">window</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="kn">import</span> <span class="n">DisplayDataItem</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="kn">import</span> <span class="n">with_input_types</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="kn">import</span> <span class="n">with_output_types</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.utils.annotations</span> <span class="kn">import</span> <span class="n">experimental</span> |
| |
| <span class="c1"># Protect against environments where spanner library is not available.</span> |
| <span class="c1"># pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports</span> |
| <span class="c1"># pylint: disable=unused-import</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">google.cloud.spanner</span> <span class="kn">import</span> <span class="n">Client</span> |
| <span class="kn">from</span> <span class="nn">google.cloud.spanner</span> <span class="kn">import</span> <span class="n">KeySet</span> |
| <span class="kn">from</span> <span class="nn">google.cloud.spanner_v1</span> <span class="kn">import</span> <span class="n">batch</span> |
| <span class="kn">from</span> <span class="nn">google.cloud.spanner_v1.database</span> <span class="kn">import</span> <span class="n">BatchSnapshot</span> |
| <span class="kn">from</span> <span class="nn">google.api_core.exceptions</span> <span class="kn">import</span> <span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span> |
| <span class="kn">from</span> <span class="nn">apitools.base.py.exceptions</span> <span class="kn">import</span> <span class="n">HttpError</span> |
| <span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span> |
| <span class="n">Client</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="n">KeySet</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="n">BatchSnapshot</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">google.cloud.spanner_v1</span> <span class="kn">import</span> <span class="n">Mutation</span> |
| <span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="c1"># Remove this and the try clause when we upgrade to google-cloud-spanner</span> |
| <span class="c1"># 3.x.x.</span> |
| <span class="kn">from</span> <span class="nn">google.cloud.spanner_v1.proto.mutation_pb2</span> <span class="kn">import</span> <span class="n">Mutation</span> |
| <span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span> |
| <span class="c1"># Ignoring for environments where the Spanner library is not available.</span> |
| <span class="k">pass</span> |
| |
| <span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="s1">'create_transaction'</span><span class="p">,</span> |
| <span class="s1">'ReadFromSpanner'</span><span class="p">,</span> |
| <span class="s1">'ReadOperation'</span><span class="p">,</span> |
| <span class="s1">'WriteToSpanner'</span><span class="p">,</span> |
| <span class="s1">'WriteMutation'</span><span class="p">,</span> |
| <span class="s1">'MutationGroup'</span> |
| <span class="p">]</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_SPANNER_TRANSACTION</span><span class="p">(</span><span class="n">namedtuple</span><span class="p">(</span><span class="s2">"SPANNER_TRANSACTION"</span><span class="p">,</span> <span class="p">[</span><span class="s2">"transaction"</span><span class="p">])):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Holds the spanner transaction details.</span> |
| <span class="sd"> """</span> |
| |
| <span class="vm">__slots__</span> <span class="o">=</span> <span class="p">()</span> |
| |
| |
| <div class="viewcode-block" id="ReadOperation"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.ReadOperation">[docs]</a><span class="k">class</span> <span class="nc">ReadOperation</span><span class="p">(</span><span class="n">namedtuple</span><span class="p">(</span> |
| <span class="s2">"ReadOperation"</span><span class="p">,</span> <span class="p">[</span><span class="s2">"is_sql"</span><span class="p">,</span> <span class="s2">"is_table"</span><span class="p">,</span> <span class="s2">"read_operation"</span><span class="p">,</span> <span class="s2">"kwargs"</span><span class="p">])):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Encapsulates a spanner read operation.</span> |
| <span class="sd"> """</span> |
| |
| <span class="vm">__slots__</span> <span class="o">=</span> <span class="p">()</span> |
| |
| <div class="viewcode-block" id="ReadOperation.query"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.ReadOperation.query">[docs]</a> <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">query</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">sql</span><span class="p">,</span> <span class="n">params</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">param_types</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A convenient method to construct ReadOperation from sql query.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> sql: SQL query statement</span> |
| <span class="sd"> params: (optional) values for parameter replacement. Keys must match the</span> |
| <span class="sd"> names used in sql</span> |
| <span class="sd"> param_types: (optional) maps explicit types for one or more param values;</span> |
| <span class="sd"> required if parameters are passed.</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">if</span> <span class="n">params</span><span class="p">:</span> |
| <span class="k">assert</span> <span class="n">param_types</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| |
| <span class="k">return</span> <span class="bp">cls</span><span class="p">(</span> |
| <span class="n">is_sql</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">is_table</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">read_operation</span><span class="o">=</span><span class="s2">"process_query_batch"</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="o">=</span><span class="p">{</span> |
| <span class="s1">'sql'</span><span class="p">:</span> <span class="n">sql</span><span class="p">,</span> <span class="s1">'params'</span><span class="p">:</span> <span class="n">params</span><span class="p">,</span> <span class="s1">'param_types'</span><span class="p">:</span> <span class="n">param_types</span> |
| <span class="p">})</span></div> |
| |
| <div class="viewcode-block" id="ReadOperation.table"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.ReadOperation.table">[docs]</a> <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">table</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">index</span><span class="o">=</span><span class="s2">""</span><span class="p">,</span> <span class="n">keyset</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A convenient method to construct ReadOperation from table.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> table: name of the table from which to fetch data.</span> |
| <span class="sd"> columns: names of columns to be retrieved.</span> |
| <span class="sd"> index: (optional) name of index to use, rather than the table's primary</span> |
| <span class="sd"> key.</span> |
| <span class="sd"> keyset: (optional) `KeySet` keys / ranges identifying rows to be</span> |
| <span class="sd"> retrieved.</span> |
| <span class="sd"> """</span> |
| <span class="n">keyset</span> <span class="o">=</span> <span class="n">keyset</span> <span class="ow">or</span> <span class="n">KeySet</span><span class="p">(</span><span class="n">all_</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">keyset</span><span class="p">,</span> <span class="n">KeySet</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"keyset must be an instance of class "</span> |
| <span class="s2">"google.cloud.spanner.KeySet"</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">cls</span><span class="p">(</span> |
| <span class="n">is_sql</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">is_table</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">read_operation</span><span class="o">=</span><span class="s2">"process_read_batch"</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="o">=</span><span class="p">{</span> |
| <span class="s1">'table'</span><span class="p">:</span> <span class="n">table</span><span class="p">,</span> |
| <span class="s1">'columns'</span><span class="p">:</span> <span class="n">columns</span><span class="p">,</span> |
| <span class="s1">'index'</span><span class="p">:</span> <span class="n">index</span><span class="p">,</span> |
| <span class="s1">'keyset'</span><span class="p">:</span> <span class="n">keyset</span> |
| <span class="p">})</span></div></div> |
| |
| |
| <span class="k">class</span> <span class="nc">_BeamSpannerConfiguration</span><span class="p">(</span><span class="n">namedtuple</span><span class="p">(</span><span class="s2">"_BeamSpannerConfiguration"</span><span class="p">,</span> |
| <span class="p">[</span><span class="s2">"project"</span><span class="p">,</span> |
| <span class="s2">"instance"</span><span class="p">,</span> |
| <span class="s2">"database"</span><span class="p">,</span> |
| <span class="s2">"table"</span><span class="p">,</span> |
| <span class="s2">"query_name"</span><span class="p">,</span> |
| <span class="s2">"credentials"</span><span class="p">,</span> |
| <span class="s2">"pool"</span><span class="p">,</span> |
| <span class="s2">"snapshot_read_timestamp"</span><span class="p">,</span> |
| <span class="s2">"snapshot_exact_staleness"</span><span class="p">])):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A namedtuple holds the immutable data of the connection string to the cloud</span> |
| <span class="sd"> spanner.</span> |
| <span class="sd"> """</span> |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">snapshot_options</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">snapshot_options</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">snapshot_exact_staleness</span><span class="p">:</span> |
| <span class="n">snapshot_options</span><span class="p">[</span><span class="s1">'exact_staleness'</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">snapshot_exact_staleness</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">snapshot_read_timestamp</span><span class="p">:</span> |
| <span class="n">snapshot_options</span><span class="p">[</span><span class="s1">'read_timestamp'</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">snapshot_read_timestamp</span> |
| <span class="k">return</span> <span class="n">snapshot_options</span> |
| |
| |
| <span class="nd">@with_input_types</span><span class="p">(</span><span class="n">ReadOperation</span><span class="p">,</span> <span class="n">_SPANNER_TRANSACTION</span><span class="p">)</span> |
| <span class="nd">@with_output_types</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">List</span><span class="p">[</span><span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">])</span> |
| <span class="k">class</span> <span class="nc">_NaiveSpannerReadDoFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">spanner_configuration</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A naive version of Spanner read which uses the transaction API of the</span> |
| <span class="sd"> cloud spanner.</span> |
| <span class="sd"> https://googleapis.dev/python/spanner/latest/transaction-api.html</span> |
| <span class="sd"> In Naive reads, this transform performs single reads, where as the</span> |
| <span class="sd"> Batch reads use the spanner partitioning query to create batches.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> spanner_configuration: (_BeamSpannerConfiguration) Connection details to</span> |
| <span class="sd"> connect with cloud spanner.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span> <span class="o">=</span> <span class="n">spanner_configuration</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_session</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SERVICE_LABEL</span><span class="p">:</span> <span class="s1">'Spanner'</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">METHOD_LABEL</span><span class="p">:</span> <span class="s1">'Read'</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_PROJECT_ID</span><span class="p">:</span> <span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">),</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_DATABASE_ID</span><span class="p">:</span> <span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">),</span> |
| <span class="p">}</span> |
| |
| <span class="k">def</span> <span class="nf">_table_metric</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">table_id</span><span class="p">,</span> <span class="n">status</span><span class="p">):</span> |
| <span class="n">database_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span> |
| <span class="n">project_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span> |
| <span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">SpannerTable</span><span class="p">(</span> |
| <span class="n">project_id</span><span class="p">,</span> <span class="n">database_id</span><span class="p">,</span> <span class="n">table_id</span><span class="p">)</span> |
| <span class="n">labels</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_TABLE_ID</span><span class="p">:</span> <span class="n">table_id</span> |
| <span class="p">}</span> |
| <span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span> |
| <span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span> |
| <span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span> |
| <span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">status</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">_query_metric</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">query_name</span><span class="p">,</span> <span class="n">status</span><span class="p">):</span> |
| <span class="n">project_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span> |
| <span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">SpannerSqlQuery</span><span class="p">(</span><span class="n">project_id</span><span class="p">,</span> <span class="n">query_name</span><span class="p">)</span> |
| <span class="n">labels</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_QUERY_NAME</span><span class="p">:</span> <span class="n">query_name</span> |
| <span class="p">}</span> |
| <span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span> |
| <span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span> |
| <span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span> |
| <span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">status</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">_get_session</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_session</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">session</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_session</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_database</span><span class="o">.</span><span class="n">session</span><span class="p">()</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">create</span><span class="p">()</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_session</span> |
| |
| <span class="k">def</span> <span class="nf">_close_session</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_session</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_session</span><span class="o">.</span><span class="n">delete</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">setup</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># setting up client to connect with cloud spanner</span> |
| <span class="n">spanner_client</span> <span class="o">=</span> <span class="n">Client</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">)</span> |
| <span class="n">instance</span> <span class="o">=</span> <span class="n">spanner_client</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">instance</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_database</span> <span class="o">=</span> <span class="n">instance</span><span class="o">.</span><span class="n">database</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">pool</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">spanner_transaction</span><span class="p">):</span> |
| <span class="c1"># `spanner_transaction` should be the instance of the _SPANNER_TRANSACTION</span> |
| <span class="c1"># object.</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">spanner_transaction</span><span class="p">,</span> <span class="n">_SPANNER_TRANSACTION</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"Invalid transaction object: </span><span class="si">%s</span><span class="s2">. It should be instance "</span> |
| <span class="s2">"of SPANNER_TRANSACTION object created by "</span> |
| <span class="s2">"spannerio.create_transaction transform."</span> <span class="o">%</span> <span class="nb">type</span><span class="p">(</span><span class="n">spanner_transaction</span><span class="p">))</span> |
| |
| <span class="n">transaction_info</span> <span class="o">=</span> <span class="n">spanner_transaction</span><span class="o">.</span><span class="n">transaction</span> |
| |
| <span class="c1"># We used batch snapshot to reuse the same transaction passed through the</span> |
| <span class="c1"># side input</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="n">BatchSnapshot</span><span class="o">.</span><span class="n">from_dict</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_database</span><span class="p">,</span> <span class="n">transaction_info</span><span class="p">)</span> |
| |
| <span class="c1"># getting the transaction from the snapshot's session to run read operation.</span> |
| <span class="c1"># with self._snapshot.session().transaction() as transaction:</span> |
| <span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_session</span><span class="p">()</span><span class="o">.</span><span class="n">transaction</span><span class="p">()</span> <span class="k">as</span> <span class="n">transaction</span><span class="p">:</span> |
| <span class="n">table_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">table</span> |
| <span class="n">query_name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">query_name</span> <span class="ow">or</span> <span class="s1">''</span> |
| |
| <span class="k">if</span> <span class="n">element</span><span class="o">.</span><span class="n">is_sql</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="n">transaction_read</span> <span class="o">=</span> <span class="n">transaction</span><span class="o">.</span><span class="n">execute_sql</span> |
| <span class="n">metric_action</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_query_metric</span> |
| <span class="n">metric_id</span> <span class="o">=</span> <span class="n">query_name</span> |
| <span class="k">elif</span> <span class="n">element</span><span class="o">.</span><span class="n">is_table</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="n">transaction_read</span> <span class="o">=</span> <span class="n">transaction</span><span class="o">.</span><span class="n">read</span> |
| <span class="n">metric_action</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_table_metric</span> |
| <span class="n">metric_id</span> <span class="o">=</span> <span class="n">table_id</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"ReadOperation is improperly configure: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="nb">str</span><span class="p">(</span><span class="n">element</span><span class="p">))</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">row</span> <span class="ow">in</span> <span class="n">transaction_read</span><span class="p">(</span><span class="o">**</span><span class="n">element</span><span class="o">.</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">yield</span> <span class="n">row</span> |
| |
| <span class="n">metric_action</span><span class="p">(</span><span class="n">metric_id</span><span class="p">,</span> <span class="s1">'ok'</span><span class="p">)</span> |
| <span class="k">except</span> <span class="p">(</span><span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span><span class="p">)</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="n">metric_action</span><span class="p">(</span><span class="n">metric_id</span><span class="p">,</span> <span class="n">e</span><span class="o">.</span><span class="n">code</span><span class="o">.</span><span class="n">value</span><span class="p">)</span> |
| <span class="k">raise</span> |
| <span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="n">metric_action</span><span class="p">(</span><span class="n">metric_id</span><span class="p">,</span> <span class="n">e</span><span class="p">)</span> |
| <span class="k">raise</span> |
| |
| |
| <span class="nd">@with_input_types</span><span class="p">(</span><span class="n">ReadOperation</span><span class="p">)</span> |
| <span class="nd">@with_output_types</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">Dict</span><span class="p">[</span><span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">])</span> |
| <span class="k">class</span> <span class="nc">_CreateReadPartitions</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A DoFn to create partitions. Uses the Partitioning API (PartitionRead /</span> |
| <span class="sd"> PartitionQuery) request to start a partitioned query operation. Returns a</span> |
| <span class="sd"> list of batch information needed to perform the actual queries.</span> |
| |
| <span class="sd"> If the element is the instance of :class:`ReadOperation` is to perform sql</span> |
| <span class="sd"> query, `PartitionQuery` API is used the create partitions and returns mappings</span> |
| <span class="sd"> of information used perform actual partitioned reads via</span> |
| <span class="sd"> :meth:`process_query_batch`.</span> |
| |
| <span class="sd"> If the element is the instance of :class:`ReadOperation` is to perform read</span> |
| <span class="sd"> from table, `PartitionRead` API is used the create partitions and returns</span> |
| <span class="sd"> mappings of information used perform actual partitioned reads via</span> |
| <span class="sd"> :meth:`process_read_batch`.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">spanner_configuration</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span> <span class="o">=</span> <span class="n">spanner_configuration</span> |
| |
| <span class="k">def</span> <span class="nf">setup</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">spanner_client</span> <span class="o">=</span> <span class="n">Client</span><span class="p">(</span> |
| <span class="n">project</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> |
| <span class="n">credentials</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">credentials</span><span class="p">)</span> |
| <span class="n">instance</span> <span class="o">=</span> <span class="n">spanner_client</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">instance</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_database</span> <span class="o">=</span> <span class="n">instance</span><span class="o">.</span><span class="n">database</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">pool</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_database</span><span class="o">.</span><span class="n">batch_snapshot</span><span class="p">(</span> |
| <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">snapshot_options</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot_dict</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">to_dict</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">element</span><span class="o">.</span><span class="n">is_sql</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="n">partitioning_action</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">generate_query_batches</span> |
| <span class="k">elif</span> <span class="n">element</span><span class="o">.</span><span class="n">is_table</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="n">partitioning_action</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">generate_read_batches</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"ReadOperation is improperly configure: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="nb">str</span><span class="p">(</span><span class="n">element</span><span class="p">))</span> |
| |
| <span class="k">for</span> <span class="n">p</span> <span class="ow">in</span> <span class="n">partitioning_action</span><span class="p">(</span><span class="o">**</span><span class="n">element</span><span class="o">.</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">yield</span> <span class="p">{</span> |
| <span class="s2">"is_sql"</span><span class="p">:</span> <span class="n">element</span><span class="o">.</span><span class="n">is_sql</span><span class="p">,</span> |
| <span class="s2">"is_table"</span><span class="p">:</span> <span class="n">element</span><span class="o">.</span><span class="n">is_table</span><span class="p">,</span> |
| <span class="s2">"read_operation"</span><span class="p">:</span> <span class="n">element</span><span class="o">.</span><span class="n">read_operation</span><span class="p">,</span> |
| <span class="s2">"partitions"</span><span class="p">:</span> <span class="n">p</span><span class="p">,</span> |
| <span class="s2">"transaction_info"</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot_dict</span> |
| <span class="p">}</span> |
| |
| |
| <span class="nd">@with_input_types</span><span class="p">(</span><span class="nb">int</span><span class="p">)</span> |
| <span class="nd">@with_output_types</span><span class="p">(</span><span class="n">_SPANNER_TRANSACTION</span><span class="p">)</span> |
| <span class="k">class</span> <span class="nc">_CreateTransactionFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A DoFn to create the transaction of cloud spanner.</span> |
| <span class="sd"> It connects to the database and and returns the transaction_id and session_id</span> |
| <span class="sd"> by using the batch_snapshot.to_dict() method available in the google cloud</span> |
| <span class="sd"> spanner sdk.</span> |
| |
| <span class="sd"> https://googleapis.dev/python/spanner/latest/database-api.html?highlight=</span> |
| <span class="sd"> batch_snapshot#google.cloud.spanner_v1.database.BatchSnapshot.to_dict</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">project_id</span><span class="p">,</span> |
| <span class="n">instance_id</span><span class="p">,</span> |
| <span class="n">database_id</span><span class="p">,</span> |
| <span class="n">credentials</span><span class="p">,</span> |
| <span class="n">pool</span><span class="p">,</span> |
| <span class="n">read_timestamp</span><span class="p">,</span> |
| <span class="n">exact_staleness</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_project_id</span> <span class="o">=</span> <span class="n">project_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_instance_id</span> <span class="o">=</span> <span class="n">instance_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_database_id</span> <span class="o">=</span> <span class="n">database_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_credentials</span> <span class="o">=</span> <span class="n">credentials</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_pool</span> <span class="o">=</span> <span class="n">pool</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot_options</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="k">if</span> <span class="n">read_timestamp</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot_options</span><span class="p">[</span><span class="s1">'read_timestamp'</span><span class="p">]</span> <span class="o">=</span> <span class="n">read_timestamp</span> |
| <span class="k">if</span> <span class="n">exact_staleness</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot_options</span><span class="p">[</span><span class="s1">'exact_staleness'</span><span class="p">]</span> <span class="o">=</span> <span class="n">exact_staleness</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">def</span> <span class="nf">setup</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_client</span> <span class="o">=</span> <span class="n">Client</span><span class="p">(</span> |
| <span class="n">project</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_project_id</span><span class="p">,</span> <span class="n">credentials</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_credentials</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_instance</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_client</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_instance_id</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_database</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_instance</span><span class="o">.</span><span class="n">database</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_database_id</span><span class="p">,</span> <span class="n">pool</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_pool</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_database</span><span class="o">.</span><span class="n">batch_snapshot</span><span class="p">(</span><span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">_snapshot_options</span><span class="p">)</span> |
| <span class="k">return</span> <span class="p">[</span><span class="n">_SPANNER_TRANSACTION</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">to_dict</span><span class="p">())]</span> |
| |
| |
| <div class="viewcode-block" id="create_transaction"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.create_transaction">[docs]</a><span class="nd">@ptransform_fn</span> |
| <span class="k">def</span> <span class="nf">create_transaction</span><span class="p">(</span> |
| <span class="n">pbegin</span><span class="p">,</span> |
| <span class="n">project_id</span><span class="p">,</span> |
| <span class="n">instance_id</span><span class="p">,</span> |
| <span class="n">database_id</span><span class="p">,</span> |
| <span class="n">credentials</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">read_timestamp</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">exact_staleness</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A PTransform method to create a batch transaction.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> pbegin: Root of the pipeline</span> |
| <span class="sd"> project_id: Cloud spanner project id. Be sure to use the Project ID,</span> |
| <span class="sd"> not the Project Number.</span> |
| <span class="sd"> instance_id: Cloud spanner instance id.</span> |
| <span class="sd"> database_id: Cloud spanner database id.</span> |
| <span class="sd"> credentials: (optional) The authorization credentials to attach to requests.</span> |
| <span class="sd"> These credentials identify this application to the service.</span> |
| <span class="sd"> If none are specified, the client will attempt to ascertain</span> |
| <span class="sd"> the credentials from the environment.</span> |
| <span class="sd"> pool: (optional) session pool to be used by database. If not passed,</span> |
| <span class="sd"> Spanner Cloud SDK uses the BurstyPool by default.</span> |
| <span class="sd"> `google.cloud.spanner.BurstyPool`. Ref:</span> |
| <span class="sd"> https://googleapis.dev/python/spanner/latest/database-api.html?#google.</span> |
| <span class="sd"> cloud.spanner_v1.database.Database</span> |
| <span class="sd"> read_timestamp: (optional) An instance of the `datetime.datetime` object to</span> |
| <span class="sd"> execute all reads at the given timestamp.</span> |
| <span class="sd"> exact_staleness: (optional) An instance of the `datetime.timedelta`</span> |
| <span class="sd"> object. These timestamp bounds execute reads at a user-specified</span> |
| <span class="sd"> timestamp.</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pbegin</span><span class="p">,</span> <span class="n">PBegin</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="n">pbegin</span> <span class="o">|</span> <span class="n">Create</span><span class="p">([</span><span class="mi">1</span><span class="p">])</span> <span class="o">|</span> <span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">_CreateTransactionFn</span><span class="p">(</span> |
| <span class="n">project_id</span><span class="p">,</span> |
| <span class="n">instance_id</span><span class="p">,</span> |
| <span class="n">database_id</span><span class="p">,</span> |
| <span class="n">credentials</span><span class="p">,</span> |
| <span class="n">pool</span><span class="p">,</span> |
| <span class="n">read_timestamp</span><span class="p">,</span> |
| <span class="n">exact_staleness</span><span class="p">)))</span></div> |
| |
| |
| <span class="nd">@with_input_types</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">Dict</span><span class="p">[</span><span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">])</span> |
| <span class="nd">@with_output_types</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">List</span><span class="p">[</span><span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">])</span> |
| <span class="k">class</span> <span class="nc">_ReadFromPartitionFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A DoFn to perform reads from the partition.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">spanner_configuration</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span> <span class="o">=</span> <span class="n">spanner_configuration</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SERVICE_LABEL</span><span class="p">:</span> <span class="s1">'Spanner'</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">METHOD_LABEL</span><span class="p">:</span> <span class="s1">'Read'</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_PROJECT_ID</span><span class="p">:</span> <span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">),</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_DATABASE_ID</span><span class="p">:</span> <span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">),</span> |
| <span class="p">}</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">service_metric</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">def</span> <span class="nf">_table_metric</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">table_id</span><span class="p">):</span> |
| <span class="n">database_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span> |
| <span class="n">project_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span> |
| <span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">SpannerTable</span><span class="p">(</span> |
| <span class="n">project_id</span><span class="p">,</span> <span class="n">database_id</span><span class="p">,</span> <span class="n">table_id</span><span class="p">)</span> |
| <span class="n">labels</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_TABLE_ID</span><span class="p">:</span> <span class="n">table_id</span> |
| <span class="p">}</span> |
| <span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span> |
| <span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span> |
| <span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">service_call_metric</span> |
| |
| <span class="k">def</span> <span class="nf">_query_metric</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">query_name</span><span class="p">):</span> |
| <span class="n">project_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span> |
| <span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">SpannerSqlQuery</span><span class="p">(</span><span class="n">project_id</span><span class="p">,</span> <span class="n">query_name</span><span class="p">)</span> |
| <span class="n">labels</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_QUERY_NAME</span><span class="p">:</span> <span class="n">query_name</span> |
| <span class="p">}</span> |
| <span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span> |
| <span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span> |
| <span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">service_call_metric</span> |
| |
| <span class="k">def</span> <span class="nf">setup</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">spanner_client</span> <span class="o">=</span> <span class="n">Client</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">)</span> |
| <span class="n">instance</span> <span class="o">=</span> <span class="n">spanner_client</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">instance</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_database</span> <span class="o">=</span> <span class="n">instance</span><span class="o">.</span><span class="n">database</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">pool</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_database</span><span class="o">.</span><span class="n">batch_snapshot</span><span class="p">(</span> |
| <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">snapshot_options</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span> <span class="o">=</span> <span class="n">BatchSnapshot</span><span class="o">.</span><span class="n">from_dict</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_database</span><span class="p">,</span> <span class="n">element</span><span class="p">[</span><span class="s1">'transaction_info'</span><span class="p">])</span> |
| |
| <span class="n">table_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">table</span> |
| <span class="n">query_name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">query_name</span> <span class="ow">or</span> <span class="s1">''</span> |
| |
| <span class="k">if</span> <span class="n">element</span><span class="p">[</span><span class="s1">'is_sql'</span><span class="p">]</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="n">read_action</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">process_query_batch</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">service_metric</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_query_metric</span><span class="p">(</span><span class="n">query_name</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">element</span><span class="p">[</span><span class="s1">'is_table'</span><span class="p">]</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="n">read_action</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">process_read_batch</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">service_metric</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_table_metric</span><span class="p">(</span><span class="n">table_id</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"ReadOperation is improperly configure: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="nb">str</span><span class="p">(</span><span class="n">element</span><span class="p">))</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">row</span> <span class="ow">in</span> <span class="n">read_action</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="s1">'partitions'</span><span class="p">]):</span> |
| <span class="k">yield</span> <span class="n">row</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">service_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="s1">'ok'</span><span class="p">)</span> |
| <span class="k">except</span> <span class="p">(</span><span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span><span class="p">)</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">service_metric</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">code</span><span class="o">.</span><span class="n">value</span><span class="p">))</span> |
| <span class="k">raise</span> |
| <span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">service_metric</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">))</span> |
| <span class="k">raise</span> |
| |
| <span class="k">def</span> <span class="nf">teardown</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_snapshot</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| |
| |
| <div class="viewcode-block" id="ReadFromSpanner"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.ReadFromSpanner">[docs]</a><span class="nd">@experimental</span><span class="p">(</span><span class="n">extra_message</span><span class="o">=</span><span class="s2">"No backwards-compatibility guarantees."</span><span class="p">)</span> |
| <span class="k">class</span> <span class="nc">ReadFromSpanner</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A PTransform to perform reads from cloud spanner.</span> |
| <span class="sd"> ReadFromSpanner uses BatchAPI to perform all read operations.</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">project_id</span><span class="p">,</span> <span class="n">instance_id</span><span class="p">,</span> <span class="n">database_id</span><span class="p">,</span> <span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">read_timestamp</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">exact_staleness</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">credentials</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">sql</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">params</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">param_types</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># with_query</span> |
| <span class="n">table</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">query_name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">index</span><span class="o">=</span><span class="s2">""</span><span class="p">,</span> |
| <span class="n">keyset</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># with_table</span> |
| <span class="n">read_operations</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># for read all</span> |
| <span class="n">transaction</span><span class="o">=</span><span class="kc">None</span> |
| <span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A PTransform that uses Spanner Batch API to perform reads.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> project_id: Cloud spanner project id. Be sure to use the Project ID,</span> |
| <span class="sd"> not the Project Number.</span> |
| <span class="sd"> instance_id: Cloud spanner instance id.</span> |
| <span class="sd"> database_id: Cloud spanner database id.</span> |
| <span class="sd"> pool: (optional) session pool to be used by database. If not passed,</span> |
| <span class="sd"> Spanner Cloud SDK uses the BurstyPool by default.</span> |
| <span class="sd"> `google.cloud.spanner.BurstyPool`. Ref:</span> |
| <span class="sd"> https://googleapis.dev/python/spanner/latest/database-api.html?#google.</span> |
| <span class="sd"> cloud.spanner_v1.database.Database</span> |
| <span class="sd"> read_timestamp: (optional) An instance of the `datetime.datetime` object</span> |
| <span class="sd"> to execute all reads at the given timestamp. By default, set to `None`.</span> |
| <span class="sd"> exact_staleness: (optional) An instance of the `datetime.timedelta`</span> |
| <span class="sd"> object. These timestamp bounds execute reads at a user-specified</span> |
| <span class="sd"> timestamp. By default, set to `None`.</span> |
| <span class="sd"> credentials: (optional) The authorization credentials to attach to</span> |
| <span class="sd"> requests. These credentials identify this application to the service.</span> |
| <span class="sd"> If none are specified, the client will attempt to ascertain</span> |
| <span class="sd"> the credentials from the environment. By default, set to `None`.</span> |
| <span class="sd"> sql: (optional) SQL query statement.</span> |
| <span class="sd"> params: (optional) Values for parameter replacement. Keys must match the</span> |
| <span class="sd"> names used in sql. By default, set to `None`.</span> |
| <span class="sd"> param_types: (optional) maps explicit types for one or more param values;</span> |
| <span class="sd"> required if params are passed. By default, set to `None`.</span> |
| <span class="sd"> table: (optional) Name of the table from which to fetch data. By</span> |
| <span class="sd"> default, set to `None`.</span> |
| <span class="sd"> columns: (optional) List of names of columns to be retrieved; required if</span> |
| <span class="sd"> the table is passed. By default, set to `None`.</span> |
| <span class="sd"> index: (optional) name of index to use, rather than the table's primary</span> |
| <span class="sd"> key. By default, set to `None`.</span> |
| <span class="sd"> keyset: (optional) keys / ranges identifying rows to be retrieved. By</span> |
| <span class="sd"> default, set to `None`.</span> |
| <span class="sd"> read_operations: (optional) List of the objects of :class:`ReadOperation`</span> |
| <span class="sd"> to perform read all. By default, set to `None`.</span> |
| <span class="sd"> transaction: (optional) PTransform of the :meth:`create_transaction` to</span> |
| <span class="sd"> perform naive read on cloud spanner. By default, set to `None`.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_configuration</span> <span class="o">=</span> <span class="n">_BeamSpannerConfiguration</span><span class="p">(</span> |
| <span class="n">project</span><span class="o">=</span><span class="n">project_id</span><span class="p">,</span> |
| <span class="n">instance</span><span class="o">=</span><span class="n">instance_id</span><span class="p">,</span> |
| <span class="n">database</span><span class="o">=</span><span class="n">database_id</span><span class="p">,</span> |
| <span class="n">table</span><span class="o">=</span><span class="n">table</span><span class="p">,</span> |
| <span class="n">query_name</span><span class="o">=</span><span class="n">query_name</span><span class="p">,</span> |
| <span class="n">credentials</span><span class="o">=</span><span class="n">credentials</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span> |
| <span class="n">snapshot_read_timestamp</span><span class="o">=</span><span class="n">read_timestamp</span><span class="p">,</span> |
| <span class="n">snapshot_exact_staleness</span><span class="o">=</span><span class="n">exact_staleness</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="o">=</span> <span class="n">read_operations</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_transaction</span> <span class="o">=</span> <span class="n">transaction</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">table</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">columns</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Columns are required with the table name."</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="n">ReadOperation</span><span class="o">.</span><span class="n">table</span><span class="p">(</span> |
| <span class="n">table</span><span class="o">=</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="n">columns</span><span class="p">,</span> <span class="n">index</span><span class="o">=</span><span class="n">index</span><span class="p">,</span> <span class="n">keyset</span><span class="o">=</span><span class="n">keyset</span><span class="p">)</span> |
| <span class="p">]</span> |
| <span class="k">elif</span> <span class="n">sql</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="n">ReadOperation</span><span class="o">.</span><span class="n">query</span><span class="p">(</span> |
| <span class="n">sql</span><span class="o">=</span><span class="n">sql</span><span class="p">,</span> <span class="n">params</span><span class="o">=</span><span class="n">params</span><span class="p">,</span> <span class="n">param_types</span><span class="o">=</span><span class="n">param_types</span><span class="p">)</span> |
| <span class="p">]</span> |
| |
| <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pbegin</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pbegin</span><span class="p">,</span> <span class="n">PBegin</span><span class="p">):</span> |
| <span class="n">pcoll</span> <span class="o">=</span> <span class="n">pbegin</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">|</span> <span class="n">Create</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pbegin</span><span class="p">,</span> <span class="n">PBegin</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"Read operation in the constructor only works with "</span> |
| <span class="s2">"the root of the pipeline."</span><span class="p">)</span> |
| <span class="n">pcoll</span> <span class="o">=</span> <span class="n">pbegin</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"Spanner required read operation, sql or table "</span> |
| <span class="s2">"with columns."</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_transaction</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="c1"># reading as batch read using the spanner partitioning query to create</span> |
| <span class="c1"># batches.</span> |
| <span class="n">p</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">pcoll</span> |
| <span class="o">|</span> <span class="s1">'Generate Partitions'</span> <span class="o">>></span> <span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">_CreateReadPartitions</span><span class="p">(</span><span class="n">spanner_configuration</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_configuration</span><span class="p">))</span> |
| <span class="o">|</span> <span class="s1">'Reshuffle'</span> <span class="o">>></span> <span class="n">Reshuffle</span><span class="p">()</span> |
| <span class="o">|</span> <span class="s1">'Read From Partitions'</span> <span class="o">>></span> <span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">_ReadFromPartitionFn</span><span class="p">(</span><span class="n">spanner_configuration</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_configuration</span><span class="p">)))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># reading as naive read, in which we don't make batches and execute the</span> |
| <span class="c1"># queries as a single read.</span> |
| <span class="n">p</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">pcoll</span> |
| <span class="o">|</span> <span class="s1">'Reshuffle'</span> <span class="o">>></span> <span class="n">Reshuffle</span><span class="p">()</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">ReadOperation</span><span class="p">)</span> |
| <span class="o">|</span> <span class="s1">'Perform Read'</span> <span class="o">>></span> <span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">_NaiveSpannerReadDoFn</span><span class="p">(</span><span class="n">spanner_configuration</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_configuration</span><span class="p">),</span> |
| <span class="n">AsSingleton</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_transaction</span><span class="p">)))</span> |
| <span class="k">return</span> <span class="n">p</span> |
| |
| <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">res</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">sql</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="n">table</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">ro</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_operations</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">ro</span><span class="o">.</span><span class="n">is_sql</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="n">sql</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">ro</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">ro</span><span class="o">.</span><span class="n">is_table</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="n">table</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">ro</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">sql</span><span class="p">:</span> |
| <span class="n">res</span><span class="p">[</span><span class="s1">'sql'</span><span class="p">]</span> <span class="o">=</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">sql</span><span class="p">),</span> <span class="n">label</span><span class="o">=</span><span class="s1">'Sql'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">table</span><span class="p">:</span> |
| <span class="n">res</span><span class="p">[</span><span class="s1">'table'</span><span class="p">]</span> <span class="o">=</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">table</span><span class="p">),</span> <span class="n">label</span><span class="o">=</span><span class="s1">'Table'</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_transaction</span><span class="p">:</span> |
| <span class="n">res</span><span class="p">[</span><span class="s1">'transaction'</span><span class="p">]</span> <span class="o">=</span> <span class="n">DisplayDataItem</span><span class="p">(</span> |
| <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_transaction</span><span class="p">),</span> <span class="n">label</span><span class="o">=</span><span class="s1">'transaction'</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">res</span></div> |
| |
| |
| <div class="viewcode-block" id="WriteToSpanner"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteToSpanner">[docs]</a><span class="nd">@experimental</span><span class="p">(</span><span class="n">extra_message</span><span class="o">=</span><span class="s2">"No backwards-compatibility guarantees."</span><span class="p">)</span> |
| <span class="k">class</span> <span class="nc">WriteToSpanner</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">project_id</span><span class="p">,</span> |
| <span class="n">instance_id</span><span class="p">,</span> |
| <span class="n">database_id</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">credentials</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">max_batch_size_bytes</span><span class="o">=</span><span class="mi">1048576</span><span class="p">,</span> |
| <span class="n">max_number_rows</span><span class="o">=</span><span class="mi">50</span><span class="p">,</span> |
| <span class="n">max_number_cells</span><span class="o">=</span><span class="mi">500</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A PTransform to write onto Google Cloud Spanner.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> project_id: Cloud spanner project id. Be sure to use the Project ID,</span> |
| <span class="sd"> not the Project Number.</span> |
| <span class="sd"> instance_id: Cloud spanner instance id.</span> |
| <span class="sd"> database_id: Cloud spanner database id.</span> |
| <span class="sd"> max_batch_size_bytes: (optional) Split the mutations into batches to</span> |
| <span class="sd"> reduce the number of transaction sent to Spanner. By default it is</span> |
| <span class="sd"> set to 1 MB (1048576 Bytes).</span> |
| <span class="sd"> max_number_rows: (optional) Split the mutations into batches to</span> |
| <span class="sd"> reduce the number of transaction sent to Spanner. By default it is</span> |
| <span class="sd"> set to 50 rows per batch.</span> |
| <span class="sd"> max_number_cells: (optional) Split the mutations into batches to</span> |
| <span class="sd"> reduce the number of transaction sent to Spanner. By default it is</span> |
| <span class="sd"> set to 500 cells per batch.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_configuration</span> <span class="o">=</span> <span class="n">_BeamSpannerConfiguration</span><span class="p">(</span> |
| <span class="n">project</span><span class="o">=</span><span class="n">project_id</span><span class="p">,</span> |
| <span class="n">instance</span><span class="o">=</span><span class="n">instance_id</span><span class="p">,</span> |
| <span class="n">database</span><span class="o">=</span><span class="n">database_id</span><span class="p">,</span> |
| <span class="n">table</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">query_name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">credentials</span><span class="o">=</span><span class="n">credentials</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span> |
| <span class="n">snapshot_read_timestamp</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">snapshot_exact_staleness</span><span class="o">=</span><span class="kc">None</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span> <span class="o">=</span> <span class="n">max_batch_size_bytes</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span> <span class="o">=</span> <span class="n">max_number_rows</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span> <span class="o">=</span> <span class="n">max_number_cells</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_database_id</span> <span class="o">=</span> <span class="n">database_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_project_id</span> <span class="o">=</span> <span class="n">project_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_instance_id</span> <span class="o">=</span> <span class="n">instance_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_pool</span> <span class="o">=</span> <span class="n">pool</span> |
| |
| <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">res</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="s1">'project_id'</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_project_id</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s1">'Project Id'</span><span class="p">),</span> |
| <span class="s1">'instance_id'</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_instance_id</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s1">'Instance Id'</span><span class="p">),</span> |
| <span class="s1">'pool'</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_pool</span><span class="p">),</span> <span class="n">label</span><span class="o">=</span><span class="s1">'Pool'</span><span class="p">),</span> |
| <span class="s1">'database'</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_database_id</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s1">'Database'</span><span class="p">),</span> |
| <span class="s1">'batch_size'</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s2">"Batch Size"</span><span class="p">),</span> |
| <span class="s1">'max_number_rows'</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s2">"Max Rows"</span><span class="p">),</span> |
| <span class="s1">'max_number_cells'</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s2">"Max Cells"</span><span class="p">),</span> |
| <span class="p">}</span> |
| <span class="k">return</span> <span class="n">res</span> |
| |
| <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="n">pcoll</span> |
| <span class="o">|</span> <span class="s2">"make batches"</span> <span class="o">>></span> <span class="n">_WriteGroup</span><span class="p">(</span> |
| <span class="n">max_batch_size_bytes</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span><span class="p">,</span> |
| <span class="n">max_number_rows</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span><span class="p">,</span> |
| <span class="n">max_number_cells</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span><span class="p">)</span> |
| <span class="o">|</span> |
| <span class="s1">'Writing to spanner'</span> <span class="o">>></span> <span class="n">ParDo</span><span class="p">(</span><span class="n">_WriteToSpannerDoFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_configuration</span><span class="p">)))</span></div> |
| |
| |
| <span class="k">class</span> <span class="nc">_Mutator</span><span class="p">(</span><span class="n">namedtuple</span><span class="p">(</span><span class="s1">'_Mutator'</span><span class="p">,</span> |
| <span class="p">[</span><span class="s2">"mutation"</span><span class="p">,</span> <span class="s2">"operation"</span><span class="p">,</span> <span class="s2">"kwargs"</span><span class="p">,</span> <span class="s2">"rows"</span><span class="p">,</span> <span class="s2">"cells"</span><span class="p">])</span> |
| <span class="p">):</span> |
| <span class="vm">__slots__</span> <span class="o">=</span> <span class="p">()</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">byte_size</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">mutation</span><span class="o">.</span><span class="n">ByteSize</span><span class="p">()</span> |
| |
| |
| <div class="viewcode-block" id="MutationGroup"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.MutationGroup">[docs]</a><span class="k">class</span> <span class="nc">MutationGroup</span><span class="p">(</span><span class="n">deque</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A Bundle of Spanner Mutations (_Mutator).</span> |
| <span class="sd"> """</span> |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">info</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">cells</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="n">rows</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="nb">bytes</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="k">for</span> <span class="n">m</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="fm">__iter__</span><span class="p">():</span> |
| <span class="nb">bytes</span> <span class="o">+=</span> <span class="n">m</span><span class="o">.</span><span class="n">byte_size</span> |
| <span class="n">rows</span> <span class="o">+=</span> <span class="n">m</span><span class="o">.</span><span class="n">rows</span> |
| <span class="n">cells</span> <span class="o">+=</span> <span class="n">m</span><span class="o">.</span><span class="n">cells</span> |
| <span class="k">return</span> <span class="p">{</span><span class="s2">"rows"</span><span class="p">:</span> <span class="n">rows</span><span class="p">,</span> <span class="s2">"cells"</span><span class="p">:</span> <span class="n">cells</span><span class="p">,</span> <span class="s2">"byte_size"</span><span class="p">:</span> <span class="nb">bytes</span><span class="p">}</span> |
| |
| <div class="viewcode-block" id="MutationGroup.primary"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.MutationGroup.primary">[docs]</a> <span class="k">def</span> <span class="nf">primary</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="nb">next</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="fm">__iter__</span><span class="p">())</span></div></div> |
| |
| |
| <div class="viewcode-block" id="WriteMutation"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteMutation">[docs]</a><span class="k">class</span> <span class="nc">WriteMutation</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| |
| <span class="n">_OPERATION_DELETE</span> <span class="o">=</span> <span class="s2">"delete"</span> |
| <span class="n">_OPERATION_INSERT</span> <span class="o">=</span> <span class="s2">"insert"</span> |
| <span class="n">_OPERATION_INSERT_OR_UPDATE</span> <span class="o">=</span> <span class="s2">"insert_or_update"</span> |
| <span class="n">_OPERATION_REPLACE</span> <span class="o">=</span> <span class="s2">"replace"</span> |
| <span class="n">_OPERATION_UPDATE</span> <span class="o">=</span> <span class="s2">"update"</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">insert</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">update</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">insert_or_update</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">replace</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">delete</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">columns</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">values</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">keyset</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A convenient class to create Spanner Mutations for Write. User can provide</span> |
| <span class="sd"> the operation via constructor or via static methods.</span> |
| |
| <span class="sd"> Note: If a user passing the operation via construction, make sure that it</span> |
| <span class="sd"> will only accept one operation at a time. For example, if a user passing</span> |
| <span class="sd"> a table name in the `insert` parameter, and he also passes the `update`</span> |
| <span class="sd"> parameter value, this will cause an error.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> insert: (Optional) Name of the table in which rows will be inserted.</span> |
| <span class="sd"> update: (Optional) Name of the table in which existing rows will be</span> |
| <span class="sd"> updated.</span> |
| <span class="sd"> insert_or_update: (Optional) Table name in which rows will be written.</span> |
| <span class="sd"> Like insert, except that if the row already exists, then its column</span> |
| <span class="sd"> values are overwritten with the ones provided. Any column values not</span> |
| <span class="sd"> explicitly written are preserved.</span> |
| <span class="sd"> replace: (Optional) Table name in which rows will be replaced. Like</span> |
| <span class="sd"> insert, except that if the row already exists, it is deleted, and the</span> |
| <span class="sd"> column values provided are inserted instead. Unlike `insert_or_update`,</span> |
| <span class="sd"> this means any values not explicitly written become `NULL`.</span> |
| <span class="sd"> delete: (Optional) Table name from which rows will be deleted. Succeeds</span> |
| <span class="sd"> whether or not the named rows were present.</span> |
| <span class="sd"> columns: The names of the columns in table to be written. The list of</span> |
| <span class="sd"> columns must contain enough columns to allow Cloud Spanner to derive</span> |
| <span class="sd"> values for all primary key columns in the row(s) to be modified.</span> |
| <span class="sd"> values: The values to be written. `values` can contain more than one</span> |
| <span class="sd"> list of values. If it does, then multiple rows are written, one for</span> |
| <span class="sd"> each entry in `values`. Each list in `values` must have exactly as</span> |
| <span class="sd"> many entries as there are entries in columns above. Sending multiple</span> |
| <span class="sd"> lists is equivalent to sending multiple Mutations, each containing one</span> |
| <span class="sd"> `values` entry and repeating table and columns.</span> |
| <span class="sd"> keyset: (Optional) The primary keys of the rows within table to delete.</span> |
| <span class="sd"> Delete is idempotent. The transaction will succeed even if some or</span> |
| <span class="sd"> all rows do not exist.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_columns</span> <span class="o">=</span> <span class="n">columns</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_values</span> <span class="o">=</span> <span class="n">values</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_keyset</span> <span class="o">=</span> <span class="n">keyset</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_insert</span> <span class="o">=</span> <span class="n">insert</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_update</span> <span class="o">=</span> <span class="n">update</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_insert_or_update</span> <span class="o">=</span> <span class="n">insert_or_update</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_replace</span> <span class="o">=</span> <span class="n">replace</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_delete</span> <span class="o">=</span> <span class="n">delete</span> |
| |
| <span class="k">if</span> <span class="nb">sum</span><span class="p">([</span><span class="mi">1</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">_insert</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_update</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_insert_or_update</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_replace</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_delete</span><span class="p">]</span> <span class="k">if</span> <span class="n">x</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">])</span> <span class="o">!=</span> <span class="mi">1</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"No or more than one write mutation operation "</span> |
| <span class="s2">"provided: <</span><span class="si">%s</span><span class="s2">: </span><span class="si">%s</span><span class="s2">>"</span> <span class="o">%</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">)))</span> |
| |
| <span class="k">def</span> <span class="fm">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_insert</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">insert</span><span class="p">(</span> |
| <span class="n">table</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_insert</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_columns</span><span class="p">,</span> <span class="n">values</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_values</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_update</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">update</span><span class="p">(</span> |
| <span class="n">table</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_update</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_columns</span><span class="p">,</span> <span class="n">values</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_values</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_insert_or_update</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">insert_or_update</span><span class="p">(</span> |
| <span class="n">table</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_insert_or_update</span><span class="p">,</span> |
| <span class="n">columns</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_columns</span><span class="p">,</span> |
| <span class="n">values</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_values</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_replace</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span> |
| <span class="n">table</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_replace</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_columns</span><span class="p">,</span> <span class="n">values</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_values</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_delete</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">delete</span><span class="p">(</span><span class="n">table</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_delete</span><span class="p">,</span> <span class="n">keyset</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_keyset</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="WriteMutation.insert"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteMutation.insert">[docs]</a> <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">insert</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">):</span> |
| <span class="sd">"""Insert one or more new table rows.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> table: Name of the table to be modified.</span> |
| <span class="sd"> columns: Name of the table columns to be modified.</span> |
| <span class="sd"> values: Values to be modified.</span> |
| <span class="sd"> """</span> |
| <span class="n">rows</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span> |
| <span class="n">cells</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">columns</span><span class="p">)</span> <span class="o">*</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">_Mutator</span><span class="p">(</span> |
| <span class="n">mutation</span><span class="o">=</span><span class="n">Mutation</span><span class="p">(</span><span class="n">insert</span><span class="o">=</span><span class="n">batch</span><span class="o">.</span><span class="n">_make_write_pb</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">)),</span> |
| <span class="n">operation</span><span class="o">=</span><span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_INSERT</span><span class="p">,</span> |
| <span class="n">rows</span><span class="o">=</span><span class="n">rows</span><span class="p">,</span> |
| <span class="n">cells</span><span class="o">=</span><span class="n">cells</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="o">=</span><span class="p">{</span> |
| <span class="s2">"table"</span><span class="p">:</span> <span class="n">table</span><span class="p">,</span> <span class="s2">"columns"</span><span class="p">:</span> <span class="n">columns</span><span class="p">,</span> <span class="s2">"values"</span><span class="p">:</span> <span class="n">values</span> |
| <span class="p">})</span></div> |
| |
| <div class="viewcode-block" id="WriteMutation.update"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteMutation.update">[docs]</a> <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">update</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">):</span> |
| <span class="sd">"""Update one or more existing table rows.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> table: Name of the table to be modified.</span> |
| <span class="sd"> columns: Name of the table columns to be modified.</span> |
| <span class="sd"> values: Values to be modified.</span> |
| <span class="sd"> """</span> |
| <span class="n">rows</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span> |
| <span class="n">cells</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">columns</span><span class="p">)</span> <span class="o">*</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">_Mutator</span><span class="p">(</span> |
| <span class="n">mutation</span><span class="o">=</span><span class="n">Mutation</span><span class="p">(</span><span class="n">update</span><span class="o">=</span><span class="n">batch</span><span class="o">.</span><span class="n">_make_write_pb</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">)),</span> |
| <span class="n">operation</span><span class="o">=</span><span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_UPDATE</span><span class="p">,</span> |
| <span class="n">rows</span><span class="o">=</span><span class="n">rows</span><span class="p">,</span> |
| <span class="n">cells</span><span class="o">=</span><span class="n">cells</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="o">=</span><span class="p">{</span> |
| <span class="s2">"table"</span><span class="p">:</span> <span class="n">table</span><span class="p">,</span> <span class="s2">"columns"</span><span class="p">:</span> <span class="n">columns</span><span class="p">,</span> <span class="s2">"values"</span><span class="p">:</span> <span class="n">values</span> |
| <span class="p">})</span></div> |
| |
| <div class="viewcode-block" id="WriteMutation.insert_or_update"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteMutation.insert_or_update">[docs]</a> <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">insert_or_update</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">):</span> |
| <span class="sd">"""Insert/update one or more table rows.</span> |
| <span class="sd"> Args:</span> |
| <span class="sd"> table: Name of the table to be modified.</span> |
| <span class="sd"> columns: Name of the table columns to be modified.</span> |
| <span class="sd"> values: Values to be modified.</span> |
| <span class="sd"> """</span> |
| <span class="n">rows</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span> |
| <span class="n">cells</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">columns</span><span class="p">)</span> <span class="o">*</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">_Mutator</span><span class="p">(</span> |
| <span class="n">mutation</span><span class="o">=</span><span class="n">Mutation</span><span class="p">(</span> |
| <span class="n">insert_or_update</span><span class="o">=</span><span class="n">batch</span><span class="o">.</span><span class="n">_make_write_pb</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">)),</span> |
| <span class="n">operation</span><span class="o">=</span><span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_INSERT_OR_UPDATE</span><span class="p">,</span> |
| <span class="n">rows</span><span class="o">=</span><span class="n">rows</span><span class="p">,</span> |
| <span class="n">cells</span><span class="o">=</span><span class="n">cells</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="o">=</span><span class="p">{</span> |
| <span class="s2">"table"</span><span class="p">:</span> <span class="n">table</span><span class="p">,</span> <span class="s2">"columns"</span><span class="p">:</span> <span class="n">columns</span><span class="p">,</span> <span class="s2">"values"</span><span class="p">:</span> <span class="n">values</span> |
| <span class="p">})</span></div> |
| |
| <div class="viewcode-block" id="WriteMutation.replace"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteMutation.replace">[docs]</a> <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">replace</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">):</span> |
| <span class="sd">"""Replace one or more table rows.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> table: Name of the table to be modified.</span> |
| <span class="sd"> columns: Name of the table columns to be modified.</span> |
| <span class="sd"> values: Values to be modified.</span> |
| <span class="sd"> """</span> |
| <span class="n">rows</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span> |
| <span class="n">cells</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">columns</span><span class="p">)</span> <span class="o">*</span> <span class="nb">len</span><span class="p">(</span><span class="n">values</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">_Mutator</span><span class="p">(</span> |
| <span class="n">mutation</span><span class="o">=</span><span class="n">Mutation</span><span class="p">(</span><span class="n">replace</span><span class="o">=</span><span class="n">batch</span><span class="o">.</span><span class="n">_make_write_pb</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">columns</span><span class="p">,</span> <span class="n">values</span><span class="p">)),</span> |
| <span class="n">operation</span><span class="o">=</span><span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_REPLACE</span><span class="p">,</span> |
| <span class="n">rows</span><span class="o">=</span><span class="n">rows</span><span class="p">,</span> |
| <span class="n">cells</span><span class="o">=</span><span class="n">cells</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="o">=</span><span class="p">{</span> |
| <span class="s2">"table"</span><span class="p">:</span> <span class="n">table</span><span class="p">,</span> <span class="s2">"columns"</span><span class="p">:</span> <span class="n">columns</span><span class="p">,</span> <span class="s2">"values"</span><span class="p">:</span> <span class="n">values</span> |
| <span class="p">})</span></div> |
| |
| <div class="viewcode-block" id="WriteMutation.delete"><a class="viewcode-back" href="../../../../../apache_beam.io.gcp.experimental.spannerio.html#apache_beam.io.gcp.experimental.spannerio.WriteMutation.delete">[docs]</a> <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">delete</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">keyset</span><span class="p">):</span> |
| <span class="sd">"""Delete one or more table rows.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> table: Name of the table to be modified.</span> |
| <span class="sd"> keyset: Keys/ranges identifying rows to delete.</span> |
| <span class="sd"> """</span> |
| <span class="n">delete</span> <span class="o">=</span> <span class="n">Mutation</span><span class="o">.</span><span class="n">Delete</span><span class="p">(</span><span class="n">table</span><span class="o">=</span><span class="n">table</span><span class="p">,</span> <span class="n">key_set</span><span class="o">=</span><span class="n">keyset</span><span class="o">.</span><span class="n">_to_pb</span><span class="p">())</span> |
| <span class="k">return</span> <span class="n">_Mutator</span><span class="p">(</span> |
| <span class="n">mutation</span><span class="o">=</span><span class="n">Mutation</span><span class="p">(</span><span class="n">delete</span><span class="o">=</span><span class="n">delete</span><span class="p">),</span> |
| <span class="n">rows</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> |
| <span class="n">cells</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> |
| <span class="n">operation</span><span class="o">=</span><span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_DELETE</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="o">=</span><span class="p">{</span> |
| <span class="s2">"table"</span><span class="p">:</span> <span class="n">table</span><span class="p">,</span> <span class="s2">"keyset"</span><span class="p">:</span> <span class="n">keyset</span> |
| <span class="p">})</span></div></div> |
| |
| |
| <span class="nd">@with_input_types</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">Union</span><span class="p">[</span><span class="n">MutationGroup</span><span class="p">,</span> <span class="n">TaggedOutput</span><span class="p">])</span> |
| <span class="nd">@with_output_types</span><span class="p">(</span><span class="n">MutationGroup</span><span class="p">)</span> |
| <span class="k">class</span> <span class="nc">_BatchFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Batches mutations together.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">max_batch_size_bytes</span><span class="p">,</span> <span class="n">max_number_rows</span><span class="p">,</span> <span class="n">max_number_cells</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span> <span class="o">=</span> <span class="n">max_batch_size_bytes</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span> <span class="o">=</span> <span class="n">max_number_rows</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span> <span class="o">=</span> <span class="n">max_number_cells</span> |
| |
| <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="o">=</span> <span class="n">MutationGroup</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_size_in_bytes</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_rows</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_cells</span> <span class="o">=</span> <span class="mi">0</span> |
| |
| <span class="k">def</span> <span class="nf">_reset_count</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="o">=</span> <span class="n">MutationGroup</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_size_in_bytes</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_rows</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_cells</span> <span class="o">=</span> <span class="mi">0</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="n">mg_info</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">info</span> |
| |
| <span class="k">if</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">'byte_size'</span><span class="p">]</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">_size_in_bytes</span> <span class="o">></span> <span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span> \ |
| <span class="ow">or</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">'cells'</span><span class="p">]</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cells</span> <span class="o">></span> <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span> \ |
| <span class="ow">or</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">'rows'</span><span class="p">]</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">_rows</span> <span class="o">></span> <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span><span class="p">:</span> |
| <span class="c1"># Batch is full, output the batch and resetting the count.</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_reset_count</span><span class="p">()</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">element</span><span class="p">)</span> |
| |
| <span class="c1"># total byte size of the mutation group.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_size_in_bytes</span> <span class="o">+=</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">'byte_size'</span><span class="p">]</span> |
| |
| <span class="c1"># total rows in the mutation group.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_rows</span> <span class="o">+=</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">'rows'</span><span class="p">]</span> |
| |
| <span class="c1"># total cells in the mutation group.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_cells</span> <span class="o">+=</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">'cells'</span><span class="p">]</span> |
| |
| <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| |
| <span class="nd">@with_input_types</span><span class="p">(</span><span class="n">MutationGroup</span><span class="p">)</span> |
| <span class="nd">@with_output_types</span><span class="p">(</span><span class="n">MutationGroup</span><span class="p">)</span> |
| <span class="k">class</span> <span class="nc">_BatchableFilterFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Filters MutationGroups larger than the batch size to the output tagged with</span> |
| <span class="sd"> OUTPUT_TAG_UNBATCHABLE.</span> |
| <span class="sd"> """</span> |
| <span class="n">OUTPUT_TAG_UNBATCHABLE</span> <span class="o">=</span> <span class="s1">'unbatchable'</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">max_batch_size_bytes</span><span class="p">,</span> <span class="n">max_number_rows</span><span class="p">,</span> <span class="n">max_number_cells</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span> <span class="o">=</span> <span class="n">max_batch_size_bytes</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span> <span class="o">=</span> <span class="n">max_number_rows</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span> <span class="o">=</span> <span class="n">max_number_cells</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batchable</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_unbatchable</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">element</span><span class="o">.</span><span class="n">primary</span><span class="p">()</span><span class="o">.</span><span class="n">operation</span> <span class="o">==</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_DELETE</span><span class="p">:</span> |
| <span class="c1"># As delete mutations are not batchable.</span> |
| <span class="k">yield</span> <span class="n">TaggedOutput</span><span class="p">(</span><span class="n">_BatchableFilterFn</span><span class="o">.</span><span class="n">OUTPUT_TAG_UNBATCHABLE</span><span class="p">,</span> <span class="n">element</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">mg_info</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">info</span> |
| <span class="k">if</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">'byte_size'</span><span class="p">]</span> <span class="o">></span> <span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span> \ |
| <span class="ow">or</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">'cells'</span><span class="p">]</span> <span class="o">></span> <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span> \ |
| <span class="ow">or</span> <span class="n">mg_info</span><span class="p">[</span><span class="s1">'rows'</span><span class="p">]</span> <span class="o">></span> <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">TaggedOutput</span><span class="p">(</span><span class="n">_BatchableFilterFn</span><span class="o">.</span><span class="n">OUTPUT_TAG_UNBATCHABLE</span><span class="p">,</span> <span class="n">element</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">element</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_WriteToSpannerDoFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">spanner_configuration</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span> <span class="o">=</span> <span class="n">spanner_configuration</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_db_instance</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">batches</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span> <span class="s1">'SpannerBatches'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SERVICE_LABEL</span><span class="p">:</span> <span class="s1">'Spanner'</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">METHOD_LABEL</span><span class="p">:</span> <span class="s1">'Write'</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_PROJECT_ID</span><span class="p">:</span> <span class="n">spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_DATABASE_ID</span><span class="p">:</span> <span class="n">spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">,</span> |
| <span class="p">}</span> |
| <span class="c1"># table_id to metrics</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span> <span class="o">=</span> <span class="p">{}</span> |
| |
| <span class="k">def</span> <span class="nf">_register_table_metric</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">table_id</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">table_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span><span class="p">:</span> |
| <span class="k">return</span> |
| <span class="n">database_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span> |
| <span class="n">project_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span> |
| <span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">SpannerTable</span><span class="p">(</span> |
| <span class="n">project_id</span><span class="p">,</span> <span class="n">database_id</span><span class="p">,</span> <span class="n">table_id</span><span class="p">)</span> |
| <span class="n">labels</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">base_labels</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SPANNER_TABLE_ID</span><span class="p">:</span> <span class="n">table_id</span> |
| <span class="p">}</span> |
| <span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span> |
| <span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span> |
| <span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span><span class="p">[</span><span class="n">table_id</span><span class="p">]</span> <span class="o">=</span> <span class="n">service_call_metric</span> |
| |
| <span class="k">def</span> <span class="nf">setup</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">spanner_client</span> <span class="o">=</span> <span class="n">Client</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">project</span><span class="p">)</span> |
| <span class="n">instance</span> <span class="o">=</span> <span class="n">spanner_client</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">instance</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_db_instance</span> <span class="o">=</span> <span class="n">instance</span><span class="o">.</span><span class="n">database</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">database</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_spanner_configuration</span><span class="o">.</span><span class="n">pool</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span> <span class="o">=</span> <span class="p">{}</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">batches</span><span class="o">.</span><span class="n">inc</span><span class="p">()</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">_db_instance</span><span class="o">.</span><span class="n">batch</span><span class="p">()</span> <span class="k">as</span> <span class="n">b</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">m</span> <span class="ow">in</span> <span class="n">element</span><span class="p">:</span> |
| <span class="n">table_id</span> <span class="o">=</span> <span class="n">m</span><span class="o">.</span><span class="n">kwargs</span><span class="p">[</span><span class="s1">'table'</span><span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_register_table_metric</span><span class="p">(</span><span class="n">table_id</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">m</span><span class="o">.</span><span class="n">operation</span> <span class="o">==</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_DELETE</span><span class="p">:</span> |
| <span class="n">batch_func</span> <span class="o">=</span> <span class="n">b</span><span class="o">.</span><span class="n">delete</span> |
| <span class="k">elif</span> <span class="n">m</span><span class="o">.</span><span class="n">operation</span> <span class="o">==</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_REPLACE</span><span class="p">:</span> |
| <span class="n">batch_func</span> <span class="o">=</span> <span class="n">b</span><span class="o">.</span><span class="n">replace</span> |
| <span class="k">elif</span> <span class="n">m</span><span class="o">.</span><span class="n">operation</span> <span class="o">==</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_INSERT_OR_UPDATE</span><span class="p">:</span> |
| <span class="n">batch_func</span> <span class="o">=</span> <span class="n">b</span><span class="o">.</span><span class="n">insert_or_update</span> |
| <span class="k">elif</span> <span class="n">m</span><span class="o">.</span><span class="n">operation</span> <span class="o">==</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_INSERT</span><span class="p">:</span> |
| <span class="n">batch_func</span> <span class="o">=</span> <span class="n">b</span><span class="o">.</span><span class="n">insert</span> |
| <span class="k">elif</span> <span class="n">m</span><span class="o">.</span><span class="n">operation</span> <span class="o">==</span> <span class="n">WriteMutation</span><span class="o">.</span><span class="n">_OPERATION_UPDATE</span><span class="p">:</span> |
| <span class="n">batch_func</span> <span class="o">=</span> <span class="n">b</span><span class="o">.</span><span class="n">update</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Unknown operation action: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">m</span><span class="o">.</span><span class="n">operation</span><span class="p">)</span> |
| <span class="n">batch_func</span><span class="p">(</span><span class="o">**</span><span class="n">m</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span> |
| <span class="k">except</span> <span class="p">(</span><span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span><span class="p">)</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">service_metric</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span><span class="o">.</span><span class="n">values</span><span class="p">():</span> |
| <span class="n">service_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">code</span><span class="o">.</span><span class="n">value</span><span class="p">))</span> |
| <span class="k">raise</span> |
| <span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">service_metric</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span><span class="o">.</span><span class="n">values</span><span class="p">():</span> |
| <span class="n">service_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">))</span> |
| <span class="k">raise</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">service_metric</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">service_metrics</span><span class="o">.</span><span class="n">values</span><span class="p">():</span> |
| <span class="n">service_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="s1">'ok'</span><span class="p">)</span> |
| |
| |
| <span class="nd">@with_input_types</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">Union</span><span class="p">[</span><span class="n">MutationGroup</span><span class="p">,</span> <span class="n">_Mutator</span><span class="p">])</span> |
| <span class="nd">@with_output_types</span><span class="p">(</span><span class="n">MutationGroup</span><span class="p">)</span> |
| <span class="k">class</span> <span class="nc">_MakeMutationGroupsFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Make Mutation group object if the element is the instance of _Mutator.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">MutationGroup</span><span class="p">):</span> |
| <span class="k">yield</span> <span class="n">element</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">_Mutator</span><span class="p">):</span> |
| <span class="k">yield</span> <span class="n">MutationGroup</span><span class="p">([</span><span class="n">element</span><span class="p">])</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"Invalid object type: </span><span class="si">%s</span><span class="s2">. Object must be an instance of "</span> |
| <span class="s2">"MutationGroup or WriteMutations"</span> <span class="o">%</span> <span class="nb">str</span><span class="p">(</span><span class="n">element</span><span class="p">))</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_WriteGroup</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">max_batch_size_bytes</span><span class="p">,</span> <span class="n">max_number_rows</span><span class="p">,</span> <span class="n">max_number_cells</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span> <span class="o">=</span> <span class="n">max_batch_size_bytes</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span> <span class="o">=</span> <span class="n">max_number_rows</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span> <span class="o">=</span> <span class="n">max_number_cells</span> |
| |
| <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="n">filter_batchable_mutations</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">pcoll</span> |
| <span class="o">|</span> <span class="s1">'Making mutation groups'</span> <span class="o">>></span> <span class="n">ParDo</span><span class="p">(</span><span class="n">_MakeMutationGroupsFn</span><span class="p">())</span> |
| <span class="o">|</span> <span class="s1">'Filtering Batchable Mutations'</span> <span class="o">>></span> <span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">_BatchableFilterFn</span><span class="p">(</span> |
| <span class="n">max_batch_size_bytes</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span><span class="p">,</span> |
| <span class="n">max_number_rows</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span><span class="p">,</span> |
| <span class="n">max_number_cells</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span><span class="p">))</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span> |
| <span class="n">_BatchableFilterFn</span><span class="o">.</span><span class="n">OUTPUT_TAG_UNBATCHABLE</span><span class="p">,</span> <span class="n">main</span><span class="o">=</span><span class="s1">'batchable'</span><span class="p">)</span> |
| <span class="p">)</span> |
| |
| <span class="n">batching_batchables</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">filter_batchable_mutations</span><span class="p">[</span><span class="s1">'batchable'</span><span class="p">]</span> |
| <span class="o">|</span> <span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">_BatchFn</span><span class="p">(</span> |
| <span class="n">max_batch_size_bytes</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_batch_size_bytes</span><span class="p">,</span> |
| <span class="n">max_number_rows</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_number_rows</span><span class="p">,</span> |
| <span class="n">max_number_cells</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_number_cells</span><span class="p">)))</span> |
| |
| <span class="k">return</span> <span class="p">((</span> |
| <span class="n">batching_batchables</span><span class="p">,</span> |
| <span class="n">filter_batchable_mutations</span><span class="p">[</span><span class="n">_BatchableFilterFn</span><span class="o">.</span><span class="n">OUTPUT_TAG_UNBATCHABLE</span><span class="p">])</span> |
| <span class="o">|</span> <span class="s1">'Merging batchable and unbatchable'</span> <span class="o">>></span> <span class="n">Flatten</span><span class="p">())</span> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| © Copyright |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |