| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>apache_beam.io.gcp.datastore.v1new.datastoreio — Apache Beam 2.47.0 documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../../../../" src="../../../../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../../../../_static/language_data.js"></script> |
| <script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| |
| <script type="text/javascript" src="../../../../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../../../../_static/pygments.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../../../../search.html" /> |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../../../../index.html" class="icon icon-home"> Apache Beam |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 2.47.0 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.ml.html">apache_beam.ml package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.testing.html">apache_beam.testing package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.yaml.html">apache_beam.yaml package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../../../../index.html">Apache Beam</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../../../../index.html">Module code</a> »</li> |
| |
| <li>apache_beam.io.gcp.datastore.v1new.datastoreio</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.io.gcp.datastore.v1new.datastoreio</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="sd">"""</span> |
| <span class="sd">A connector for reading from and writing to Google Cloud Datastore.</span> |
| |
| <span class="sd">This module uses the newer google-cloud-datastore client package. Its API was</span> |
| <span class="sd">different enough to require extensive changes to this and associated modules.</span> |
| |
| <span class="sd">**Updates to the I/O connector code**</span> |
| |
| <span class="sd">For any significant updates to this I/O connector, please consider involving</span> |
| <span class="sd">corresponding code reviewers mentioned in</span> |
| <span class="sd">https://github.com/apache/beam/blob/master/sdks/python/OWNERS</span> |
| <span class="sd">"""</span> |
| <span class="c1"># pytype: skip-file</span> |
| |
| <span class="kn">import</span> <span class="nn">logging</span> |
| <span class="kn">import</span> <span class="nn">time</span> |
| |
| <span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">typehints</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.internal.metrics.metric</span> <span class="kn">import</span> <span class="n">ServiceCallMetric</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp</span> <span class="kn">import</span> <span class="n">resource_identifiers</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp.datastore.v1new</span> <span class="kn">import</span> <span class="n">helper</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp.datastore.v1new</span> <span class="kn">import</span> <span class="n">query_splitter</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp.datastore.v1new</span> <span class="kn">import</span> <span class="n">types</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp.datastore.v1new</span> <span class="kn">import</span> <span class="n">util</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp.datastore.v1new.adaptive_throttler</span> <span class="kn">import</span> <span class="n">AdaptiveThrottler</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp.datastore.v1new.rampup_throttling_fn</span> <span class="kn">import</span> <span class="n">RampupThrottlingFn</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.metrics</span> <span class="kn">import</span> <span class="n">monitoring_infos</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.metrics.metric</span> <span class="kn">import</span> <span class="n">Metrics</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">Create</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">DoFn</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">ParDo</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">PTransform</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">Reshuffle</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.utils</span> <span class="kn">import</span> <span class="n">retry</span> |
| |
| <span class="c1"># Protect against environments where datastore library is not available.</span> |
| <span class="c1"># pylint: disable=wrong-import-order, wrong-import-position</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">apitools.base.py.exceptions</span> <span class="kn">import</span> <span class="n">HttpError</span> |
| <span class="kn">from</span> <span class="nn">google.api_core.exceptions</span> <span class="kn">import</span> <span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span> |
| <span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span> |
| <span class="k">pass</span> |
| |
| <span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s1">'ReadFromDatastore'</span><span class="p">,</span> <span class="s1">'WriteToDatastore'</span><span class="p">,</span> <span class="s1">'DeleteFromDatastore'</span><span class="p">]</span> |
| |
| <span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="ReadFromDatastore"><a class="viewcode-back" href="../../../../../../apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore">[docs]</a><span class="nd">@typehints</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Entity</span><span class="p">)</span> |
| <span class="k">class</span> <span class="nc">ReadFromDatastore</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A ``PTransform`` for querying Google Cloud Datastore.</span> |
| |
| <span class="sd"> To read a ``PCollection[Entity]`` from a Cloud Datastore ``Query``, use</span> |
| <span class="sd"> the ``ReadFromDatastore`` transform by providing a `query` to</span> |
| <span class="sd"> read from. The project and optional namespace are set in the query.</span> |
| <span class="sd"> The query will be split into multiple queries to allow for parallelism. The</span> |
| <span class="sd"> degree of parallelism is automatically determined, but can be overridden by</span> |
| <span class="sd"> setting `num_splits` to a value of 1 or greater.</span> |
| |
| <span class="sd"> Note: Normally, a runner will read from Cloud Datastore in parallel across</span> |
| <span class="sd"> many workers. However, when the `query` is configured with a `limit` or if the</span> |
| <span class="sd"> query contains inequality filters like `GREATER_THAN, LESS_THAN` etc., then</span> |
| <span class="sd"> all the returned results will be read by a single worker in order to ensure</span> |
| <span class="sd"> correct data. Since data is read from a single worker, this could have</span> |
| <span class="sd"> significant impact on the performance of the job. Using a</span> |
| <span class="sd"> :class:`~apache_beam.transforms.util.Reshuffle` transform after the read in</span> |
| <span class="sd"> this case might be beneficial for parallelizing work across workers.</span> |
| |
| <span class="sd"> The semantics for query splitting is defined below:</span> |
| <span class="sd"> 1. If `num_splits` is equal to 0, then the number of splits will be chosen</span> |
| <span class="sd"> dynamically at runtime based on the query data size.</span> |
| |
| <span class="sd"> 2. Any value of `num_splits` greater than</span> |
| <span class="sd"> `ReadFromDatastore._NUM_QUERY_SPLITS_MAX` will be capped at that value.</span> |
| |
| <span class="sd"> 3. If the `query` has a user limit set, or contains inequality filters, then</span> |
| <span class="sd"> `num_splits` will be ignored and no split will be performed.</span> |
| |
| <span class="sd"> 4. Under certain cases Cloud Datastore is unable to split query to the</span> |
| <span class="sd"> requested number of splits. In such cases we just use whatever Cloud</span> |
| <span class="sd"> Datastore returns.</span> |
| |
| <span class="sd"> See https://developers.google.com/datastore/ for more details on Google Cloud</span> |
| <span class="sd"> Datastore.</span> |
| <span class="sd"> """</span> |
| |
| <span class="c1"># An upper bound on the number of splits for a query.</span> |
| <span class="n">_NUM_QUERY_SPLITS_MAX</span> <span class="o">=</span> <span class="mi">50000</span> |
| <span class="c1"># A lower bound on the number of splits for a query. This is to ensure that</span> |
| <span class="c1"># we parallelize the query even when Datastore statistics are not available.</span> |
| <span class="n">_NUM_QUERY_SPLITS_MIN</span> <span class="o">=</span> <span class="mi">12</span> |
| <span class="c1"># Default bundle size of 64MB.</span> |
| <span class="n">_DEFAULT_BUNDLE_SIZE_BYTES</span> <span class="o">=</span> <span class="mi">64</span> <span class="o">*</span> <span class="mi">1024</span> <span class="o">*</span> <span class="mi">1024</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">query</span><span class="p">,</span> <span class="n">num_splits</span><span class="o">=</span><span class="mi">0</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Initialize the `ReadFromDatastore` transform.</span> |
| |
| <span class="sd"> This transform outputs elements of type</span> |
| <span class="sd"> :class:`~apache_beam.io.gcp.datastore.v1new.types.Entity`.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> query: (:class:`~apache_beam.io.gcp.datastore.v1new.types.Query`) query</span> |
| <span class="sd"> used to fetch entities.</span> |
| <span class="sd"> num_splits: (:class:`int`) (optional) Number of splits for the query.</span> |
| <span class="sd"> """</span> |
| <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">query</span><span class="o">.</span><span class="n">project</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"query.project cannot be empty"</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">query</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"query cannot be empty"</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">num_splits</span> <span class="o"><</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"num_splits must be greater than or equal 0"</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_project</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">project</span> |
| <span class="c1"># using _namespace conflicts with DisplayData._namespace</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_datastore_namespace</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">namespace</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_query</span> <span class="o">=</span> <span class="n">query</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span> <span class="o">=</span> <span class="n">num_splits</span> |
| |
| <div class="viewcode-block" id="ReadFromDatastore.expand"><a class="viewcode-back" href="../../../../../../apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="c1"># This is a composite transform involves the following:</span> |
| <span class="c1"># 1. Create a singleton of the user provided `query` and apply a ``ParDo``</span> |
| <span class="c1"># that splits the query into `num_splits` queries if possible.</span> |
| <span class="c1">#</span> |
| <span class="c1"># If the value of `num_splits` is 0, the number of splits will be</span> |
| <span class="c1"># computed dynamically based on the size of the data for the `query`.</span> |
| <span class="c1">#</span> |
| <span class="c1"># 2. The resulting ``PCollection`` is sharded across workers using a</span> |
| <span class="c1"># ``Reshuffle`` operation.</span> |
| <span class="c1">#</span> |
| <span class="c1"># 3. In the third step, a ``ParDo`` reads entities for each query and</span> |
| <span class="c1"># outputs a ``PCollection[Entity]``.</span> |
| |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span> |
| <span class="o">|</span> <span class="s1">'UserQuery'</span> <span class="o">>></span> <span class="n">Create</span><span class="p">([</span><span class="bp">self</span><span class="o">.</span><span class="n">_query</span><span class="p">])</span> |
| <span class="o">|</span> <span class="s1">'SplitQuery'</span> <span class="o">>></span> <span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_SplitQueryFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span><span class="p">))</span> |
| <span class="o">|</span> <span class="n">Reshuffle</span><span class="p">()</span> |
| <span class="o">|</span> <span class="s1">'Read'</span> <span class="o">>></span> <span class="n">ParDo</span><span class="p">(</span><span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_QueryFn</span><span class="p">()))</span></div> |
| |
| <div class="viewcode-block" id="ReadFromDatastore.display_data"><a class="viewcode-back" href="../../../../../../apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">disp_data</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="s1">'project'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_query</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> |
| <span class="s1">'query'</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_query</span><span class="p">),</span> |
| <span class="s1">'num_splits'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span> |
| <span class="p">}</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_datastore_namespace</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">disp_data</span><span class="p">[</span><span class="s1">'namespace'</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_datastore_namespace</span> |
| |
| <span class="k">return</span> <span class="n">disp_data</span></div> |
| |
| <span class="nd">@typehints</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Query</span><span class="p">)</span> |
| <span class="nd">@typehints</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Query</span><span class="p">)</span> |
| <span class="k">class</span> <span class="nc">_SplitQueryFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A `DoFn` that splits a given query into multiple sub-queries."""</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">num_splits</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span> <span class="o">=</span> <span class="n">num_splits</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">query</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="n">client</span> <span class="o">=</span> <span class="n">helper</span><span class="o">.</span><span class="n">get_client</span><span class="p">(</span><span class="n">query</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> <span class="n">query</span><span class="o">.</span><span class="n">namespace</span><span class="p">)</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="c1"># Short circuit estimating num_splits if split is not possible.</span> |
| <span class="n">query_splitter</span><span class="o">.</span><span class="n">validate_split</span><span class="p">(</span><span class="n">query</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="n">estimated_num_splits</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_estimated_num_splits</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">query</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">estimated_num_splits</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span> |
| |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Splitting the query into </span><span class="si">%d</span><span class="s2"> splits"</span><span class="p">,</span> <span class="n">estimated_num_splits</span><span class="p">)</span> |
| <span class="n">query_splits</span> <span class="o">=</span> <span class="n">query_splitter</span><span class="o">.</span><span class="n">get_splits</span><span class="p">(</span> |
| <span class="n">client</span><span class="p">,</span> <span class="n">query</span><span class="p">,</span> <span class="n">estimated_num_splits</span><span class="p">)</span> |
| <span class="k">except</span> <span class="n">query_splitter</span><span class="o">.</span><span class="n">QuerySplitterError</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s2">"Unable to parallelize the given query: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">query</span><span class="p">,</span> <span class="n">exc_info</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> |
| <span class="n">query_splits</span> <span class="o">=</span> <span class="p">[</span><span class="n">query</span><span class="p">]</span> |
| |
| <span class="k">return</span> <span class="n">query_splits</span> |
| |
| <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">disp_data</span> <span class="o">=</span> <span class="p">{</span><span class="s1">'num_splits'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span><span class="p">}</span> |
| <span class="k">return</span> <span class="n">disp_data</span> |
| |
| <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">query_latest_statistics_timestamp</span><span class="p">(</span><span class="n">client</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Fetches the latest timestamp of statistics from Cloud Datastore.</span> |
| |
| <span class="sd"> Cloud Datastore system tables with statistics are periodically updated.</span> |
| <span class="sd"> This method fetches the latest timestamp (in microseconds) of statistics</span> |
| <span class="sd"> update using the `__Stat_Total__` table.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">client</span><span class="o">.</span><span class="n">namespace</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">kind</span> <span class="o">=</span> <span class="s1">'__Stat_Total__'</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">kind</span> <span class="o">=</span> <span class="s1">'__Stat_Ns_Total__'</span> |
| <span class="n">query</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">query</span><span class="p">(</span> |
| <span class="n">kind</span><span class="o">=</span><span class="n">kind</span><span class="p">,</span> <span class="n">order</span><span class="o">=</span><span class="p">[</span> |
| <span class="s2">"-timestamp"</span><span class="p">,</span> |
| <span class="p">])</span> |
| <span class="n">entities</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">query</span><span class="o">.</span><span class="n">fetch</span><span class="p">(</span><span class="n">limit</span><span class="o">=</span><span class="mi">1</span><span class="p">))</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">entities</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span><span class="s2">"Datastore total statistics unavailable."</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">entities</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="s1">'timestamp'</span><span class="p">]</span> |
| |
| <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">get_estimated_size_bytes</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">query</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Get the estimated size of the data returned by this instance's query.</span> |
| |
| <span class="sd"> Cloud Datastore provides no way to get a good estimate of how large the</span> |
| <span class="sd"> result of a query is going to be. Hence we use the __Stat_Kind__ system</span> |
| <span class="sd"> table to get size of the entire kind as an approximate estimate, assuming</span> |
| <span class="sd"> exactly 1 kind is specified in the query.</span> |
| <span class="sd"> See https://cloud.google.com/datastore/docs/concepts/stats.</span> |
| <span class="sd"> """</span> |
| <span class="n">kind_name</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">kind</span> |
| <span class="n">latest_timestamp</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_SplitQueryFn</span><span class="o">.</span><span class="n">query_latest_statistics_timestamp</span><span class="p">(</span> |
| <span class="n">client</span><span class="p">))</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s1">'Latest stats timestamp for kind </span><span class="si">%s</span><span class="s1"> is </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">kind_name</span><span class="p">,</span> |
| <span class="n">latest_timestamp</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">client</span><span class="o">.</span><span class="n">namespace</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">kind</span> <span class="o">=</span> <span class="s1">'__Stat_Kind__'</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">kind</span> <span class="o">=</span> <span class="s1">'__Stat_Ns_Kind__'</span> |
| <span class="n">query</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">kind</span><span class="o">=</span><span class="n">kind</span><span class="p">)</span> |
| <span class="n">query</span><span class="o">.</span><span class="n">add_filter</span><span class="p">(</span><span class="s1">'kind_name'</span><span class="p">,</span> <span class="s1">'='</span><span class="p">,</span> <span class="n">kind_name</span><span class="p">)</span> |
| <span class="n">query</span><span class="o">.</span><span class="n">add_filter</span><span class="p">(</span><span class="s1">'timestamp'</span><span class="p">,</span> <span class="s1">'='</span><span class="p">,</span> <span class="n">latest_timestamp</span><span class="p">)</span> |
| |
| <span class="n">entities</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">query</span><span class="o">.</span><span class="n">fetch</span><span class="p">(</span><span class="n">limit</span><span class="o">=</span><span class="mi">1</span><span class="p">))</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">entities</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span> |
| <span class="s1">'Datastore statistics for kind </span><span class="si">%s</span><span class="s1"> unavailable'</span> <span class="o">%</span> <span class="n">kind_name</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">entities</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="s1">'entity_bytes'</span><span class="p">]</span> |
| |
| <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">get_estimated_num_splits</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">query</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Computes the number of splits to be performed on the query."""</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">estimated_size_bytes</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_SplitQueryFn</span><span class="o">.</span><span class="n">get_estimated_size_bytes</span><span class="p">(</span> |
| <span class="n">client</span><span class="p">,</span> <span class="n">query</span><span class="p">))</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Estimated size bytes for query: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">estimated_size_bytes</span><span class="p">)</span> |
| <span class="n">num_splits</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span> |
| <span class="nb">min</span><span class="p">(</span> |
| <span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_NUM_QUERY_SPLITS_MAX</span><span class="p">,</span> |
| <span class="nb">round</span><span class="p">((</span> |
| <span class="nb">float</span><span class="p">(</span><span class="n">estimated_size_bytes</span><span class="p">)</span> <span class="o">/</span> |
| <span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_DEFAULT_BUNDLE_SIZE_BYTES</span><span class="p">))))</span> |
| <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s1">'Failed to fetch estimated size bytes: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">e</span><span class="p">)</span> |
| <span class="c1"># Fallback in case estimated size is unavailable.</span> |
| <span class="n">num_splits</span> <span class="o">=</span> <span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_NUM_QUERY_SPLITS_MIN</span> |
| |
| <span class="k">return</span> <span class="nb">max</span><span class="p">(</span><span class="n">num_splits</span><span class="p">,</span> <span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_NUM_QUERY_SPLITS_MIN</span><span class="p">)</span> |
| |
| <span class="nd">@typehints</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Query</span><span class="p">)</span> |
| <span class="nd">@typehints</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Entity</span><span class="p">)</span> |
| <span class="k">class</span> <span class="nc">_QueryFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A DoFn that fetches entities from Cloud Datastore, for a given query."""</span> |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">query</span><span class="p">,</span> <span class="o">*</span><span class="n">unused_args</span><span class="p">,</span> <span class="o">**</span><span class="n">unused_kwargs</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">query</span><span class="o">.</span><span class="n">namespace</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">query</span><span class="o">.</span><span class="n">namespace</span> <span class="o">=</span> <span class="s1">''</span> |
| <span class="n">_client</span> <span class="o">=</span> <span class="n">helper</span><span class="o">.</span><span class="n">get_client</span><span class="p">(</span><span class="n">query</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> <span class="n">query</span><span class="o">.</span><span class="n">namespace</span><span class="p">)</span> |
| <span class="n">client_query</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">_to_client_query</span><span class="p">(</span><span class="n">_client</span><span class="p">)</span> |
| <span class="c1"># Create request count metric</span> |
| <span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">DatastoreNamespace</span><span class="p">(</span> |
| <span class="n">query</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> <span class="n">query</span><span class="o">.</span><span class="n">namespace</span><span class="p">)</span> |
| <span class="n">labels</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SERVICE_LABEL</span><span class="p">:</span> <span class="s1">'Datastore'</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">METHOD_LABEL</span><span class="p">:</span> <span class="s1">'BatchDatastoreRead'</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">DATASTORE_NAMESPACE_LABEL</span><span class="p">:</span> <span class="n">query</span><span class="o">.</span><span class="n">namespace</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">DATASTORE_PROJECT_ID_LABEL</span><span class="p">:</span> <span class="n">query</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">STATUS_LABEL</span><span class="p">:</span> <span class="s1">'ok'</span> |
| <span class="p">}</span> |
| <span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span> |
| <span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span> |
| <span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">client_entity</span> <span class="ow">in</span> <span class="n">client_query</span><span class="o">.</span><span class="n">fetch</span><span class="p">(</span><span class="n">query</span><span class="o">.</span><span class="n">limit</span><span class="p">):</span> |
| <span class="k">yield</span> <span class="n">types</span><span class="o">.</span><span class="n">Entity</span><span class="o">.</span><span class="n">from_client_entity</span><span class="p">(</span><span class="n">client_entity</span><span class="p">)</span> |
| <span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="s1">'ok'</span><span class="p">)</span> |
| <span class="k">except</span> <span class="p">(</span><span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span><span class="p">)</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="c1"># e.code.value contains the numeric http status code.</span> |
| <span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">code</span><span class="o">.</span><span class="n">value</span><span class="p">)</span> |
| <span class="k">raise</span> |
| <span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> |
| <span class="k">raise</span></div> |
| |
| |
| <span class="k">class</span> <span class="nc">_Mutate</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A ``PTransform`` that writes mutations to Cloud Datastore.</span> |
| |
| <span class="sd"> Only idempotent Datastore mutation operations (upsert and delete) are</span> |
| <span class="sd"> supported, as the commits are retried when failures occur.</span> |
| <span class="sd"> """</span> |
| |
| <span class="c1"># Default hint for the expected number of workers in the ramp-up throttling</span> |
| <span class="c1"># step for write or delete operations.</span> |
| <span class="n">_DEFAULT_HINT_NUM_WORKERS</span> <span class="o">=</span> <span class="mi">500</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">mutate_fn</span><span class="p">,</span> |
| <span class="n">throttle_rampup</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">hint_num_workers</span><span class="o">=</span><span class="n">_DEFAULT_HINT_NUM_WORKERS</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Initializes a Mutate transform.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> mutate_fn: Instance of `DatastoreMutateFn` to use.</span> |
| <span class="sd"> throttle_rampup: Whether to enforce a gradual ramp-up.</span> |
| <span class="sd"> hint_num_workers: A hint for the expected number of workers, used to</span> |
| <span class="sd"> estimate appropriate limits during ramp-up throttling.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_mutate_fn</span> <span class="o">=</span> <span class="n">mutate_fn</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_throttle_rampup</span> <span class="o">=</span> <span class="n">throttle_rampup</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_hint_num_workers</span> <span class="o">=</span> <span class="n">hint_num_workers</span> |
| |
| <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_throttle_rampup</span><span class="p">:</span> |
| <span class="n">throttling_fn</span> <span class="o">=</span> <span class="n">RampupThrottlingFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_hint_num_workers</span><span class="p">)</span> |
| <span class="n">pcoll</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">pcoll</span> <span class="o">|</span> <span class="s1">'Enforce throttling during ramp-up'</span> <span class="o">>></span> <span class="n">ParDo</span><span class="p">(</span><span class="n">throttling_fn</span><span class="p">))</span> |
| <span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="s1">'Write Batch to Datastore'</span> <span class="o">>></span> <span class="n">ParDo</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_mutate_fn</span><span class="p">)</span> |
| |
| <span class="k">class</span> <span class="nc">DatastoreMutateFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A ``DoFn`` that write mutations to Datastore.</span> |
| |
| <span class="sd"> Mutations are written in batches, where the maximum batch size is</span> |
| <span class="sd"> `util.WRITE_BATCH_SIZE`.</span> |
| |
| <span class="sd"> Commits are non-transactional. If a commit fails because of a conflict over</span> |
| <span class="sd"> an entity group, the commit will be retried. This means that the mutation</span> |
| <span class="sd"> should be idempotent (`upsert` and `delete` mutations) to prevent duplicate</span> |
| <span class="sd"> data or errors.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">project</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Args:</span> |
| <span class="sd"> project: (str) cloud project id</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_project</span> <span class="o">=</span> <span class="n">project</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_client</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_rpc_successes</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span> |
| <span class="n">_Mutate</span><span class="o">.</span><span class="n">DatastoreMutateFn</span><span class="p">,</span> <span class="s2">"datastoreRpcSuccesses"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_rpc_errors</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span> |
| <span class="n">_Mutate</span><span class="o">.</span><span class="n">DatastoreMutateFn</span><span class="p">,</span> <span class="s2">"datastoreRpcErrors"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_throttled_secs</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span> |
| <span class="n">_Mutate</span><span class="o">.</span><span class="n">DatastoreMutateFn</span><span class="p">,</span> <span class="s2">"cumulativeThrottlingSeconds"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span> <span class="o">=</span> <span class="n">AdaptiveThrottler</span><span class="p">(</span> |
| <span class="n">window_ms</span><span class="o">=</span><span class="mi">120000</span><span class="p">,</span> <span class="n">bucket_ms</span><span class="o">=</span><span class="mi">1000</span><span class="p">,</span> <span class="n">overload_ratio</span><span class="o">=</span><span class="mf">1.25</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">_update_rpc_stats</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">successes</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> <span class="n">errors</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> <span class="n">throttled_secs</span><span class="o">=</span><span class="mi">0</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_rpc_successes</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="n">successes</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_rpc_errors</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="n">errors</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_throttled_secs</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="n">throttled_secs</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_client</span> <span class="o">=</span> <span class="n">helper</span><span class="o">.</span><span class="n">get_client</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_project</span><span class="p">,</span> <span class="n">namespace</span><span class="o">=</span><span class="kc">None</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_init_batch</span><span class="p">()</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch_sizer</span> <span class="o">=</span> <span class="n">util</span><span class="o">.</span><span class="n">DynamicBatchSizer</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_target_batch_size</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch_sizer</span><span class="o">.</span><span class="n">get_batch_size</span><span class="p">(</span> |
| <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">element_to_client_batch_item</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span> |
| |
| <span class="k">def</span> <span class="nf">add_to_batch</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">client_batch_item</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span> |
| |
| <span class="nd">@retry</span><span class="o">.</span><span class="n">with_exponential_backoff</span><span class="p">(</span> |
| <span class="n">num_retries</span><span class="o">=</span><span class="mi">5</span><span class="p">,</span> <span class="n">retry_filter</span><span class="o">=</span><span class="n">helper</span><span class="o">.</span><span class="n">retry_on_rpc_error</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">write_mutations</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">throttler</span><span class="p">,</span> <span class="n">rpc_stats_callback</span><span class="p">,</span> <span class="n">throttle_delay</span><span class="o">=</span><span class="mi">1</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Writes a batch of mutations to Cloud Datastore.</span> |
| |
| <span class="sd"> If a commit fails, it will be retried up to 5 times. All mutations in the</span> |
| <span class="sd"> batch will be committed again, even if the commit was partially</span> |
| <span class="sd"> successful. If the retry limit is exceeded, the last exception from</span> |
| <span class="sd"> Cloud Datastore will be raised.</span> |
| |
| <span class="sd"> Assumes that the Datastore client library does not perform any retries on</span> |
| <span class="sd"> commits. It has not been determined how such retries would interact with</span> |
| <span class="sd"> the retries and throttler used here.</span> |
| <span class="sd"> See ``google.cloud.datastore_v1.gapic.datastore_client_config`` for</span> |
| <span class="sd"> retry config.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> rpc_stats_callback: a function to call with arguments `successes` and</span> |
| <span class="sd"> `failures` and `throttled_secs`; this is called to record successful</span> |
| <span class="sd"> and failed RPCs to Datastore and time spent waiting for throttling.</span> |
| <span class="sd"> throttler: (``apache_beam.io.gcp.datastore.v1new.adaptive_throttler.</span> |
| <span class="sd"> AdaptiveThrottler``)</span> |
| <span class="sd"> Throttler instance used to select requests to be throttled.</span> |
| <span class="sd"> throttle_delay: (:class:`float`) time in seconds to sleep when</span> |
| <span class="sd"> throttled.</span> |
| |
| <span class="sd"> Returns:</span> |
| <span class="sd"> (int) The latency of the successful RPC in milliseconds.</span> |
| <span class="sd"> """</span> |
| <span class="c1"># Client-side throttling.</span> |
| <span class="k">while</span> <span class="n">throttler</span><span class="o">.</span><span class="n">throttle_request</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">):</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s2">"Delaying request for </span><span class="si">%d</span><span class="s2">s due to previous failures"</span><span class="p">,</span> <span class="n">throttle_delay</span><span class="p">)</span> |
| <span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="n">throttle_delay</span><span class="p">)</span> |
| <span class="n">rpc_stats_callback</span><span class="p">(</span><span class="n">throttled_secs</span><span class="o">=</span><span class="n">throttle_delay</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="c1"># this will only happen when we re-try previously failed batch</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_client</span><span class="o">.</span><span class="n">batch</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">begin</span><span class="p">()</span> |
| <span class="k">for</span> <span class="n">element</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch_elements</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">add_to_batch</span><span class="p">(</span><span class="n">element</span><span class="p">)</span> |
| |
| <span class="c1"># Create request count metric</span> |
| <span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">DatastoreNamespace</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_project</span><span class="p">,</span> <span class="s2">""</span><span class="p">)</span> |
| <span class="n">labels</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SERVICE_LABEL</span><span class="p">:</span> <span class="s1">'Datastore'</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">METHOD_LABEL</span><span class="p">:</span> <span class="s1">'BatchDatastoreWrite'</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">DATASTORE_NAMESPACE_LABEL</span><span class="p">:</span> <span class="s2">""</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">DATASTORE_PROJECT_ID_LABEL</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_project</span><span class="p">,</span> |
| <span class="n">monitoring_infos</span><span class="o">.</span><span class="n">STATUS_LABEL</span><span class="p">:</span> <span class="s1">'ok'</span> |
| <span class="p">}</span> |
| |
| <span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span> |
| <span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span> |
| <span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">start_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> |
| <span class="n">end_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> |
| <span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="s1">'ok'</span><span class="p">)</span> |
| |
| <span class="n">rpc_stats_callback</span><span class="p">(</span><span class="n">successes</span><span class="o">=</span><span class="mi">1</span><span class="p">)</span> |
| <span class="n">throttler</span><span class="o">.</span><span class="n">successful_request</span><span class="p">(</span><span class="n">start_time</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">)</span> |
| <span class="n">commit_time_ms</span> <span class="o">=</span> <span class="nb">int</span><span class="p">((</span><span class="n">end_time</span> <span class="o">-</span> <span class="n">start_time</span><span class="p">)</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">commit_time_ms</span> |
| <span class="k">except</span> <span class="p">(</span><span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span><span class="p">)</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="c1"># e.code.value contains the numeric http status code.</span> |
| <span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">code</span><span class="o">.</span><span class="n">value</span><span class="p">)</span> |
| <span class="n">rpc_stats_callback</span><span class="p">(</span><span class="n">errors</span><span class="o">=</span><span class="mi">1</span><span class="p">)</span> |
| <span class="k">raise</span> |
| <span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> |
| <span class="n">rpc_stats_callback</span><span class="p">(</span><span class="n">errors</span><span class="o">=</span><span class="mi">1</span><span class="p">)</span> |
| <span class="k">raise</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="n">client_element</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">element_to_client_batch_item</span><span class="p">(</span><span class="n">element</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch_elements</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">client_element</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">add_to_batch</span><span class="p">(</span><span class="n">client_element</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch_bytes_size</span> <span class="o">+=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">mutations</span><span class="p">[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">_pb</span><span class="o">.</span><span class="n">ByteSize</span><span class="p">()</span> |
| |
| <span class="k">if</span> <span class="p">(</span><span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">mutations</span><span class="p">)</span> <span class="o">>=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_target_batch_size</span> <span class="ow">or</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch_bytes_size</span> <span class="o">></span> <span class="n">util</span><span class="o">.</span><span class="n">WRITE_BATCH_MAX_BYTES_SIZE</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_flush_batch</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch_elements</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_flush_batch</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">_init_batch</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch_bytes_size</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_client</span><span class="o">.</span><span class="n">batch</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">begin</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch_elements</span> <span class="o">=</span> <span class="p">[]</span> |
| |
| <span class="k">def</span> <span class="nf">_flush_batch</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># Flush the current batch of mutations to Cloud Datastore.</span> |
| <span class="n">latency_ms</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_mutations</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="p">,</span> |
| <span class="n">rpc_stats_callback</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_update_rpc_stats</span><span class="p">,</span> |
| <span class="n">throttle_delay</span><span class="o">=</span><span class="n">util</span><span class="o">.</span><span class="n">WRITE_BATCH_TARGET_LATENCY_MS</span> <span class="o">//</span> <span class="mi">1000</span><span class="p">)</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span> |
| <span class="s2">"Successfully wrote </span><span class="si">%d</span><span class="s2"> mutations in </span><span class="si">%d</span><span class="s2">ms."</span><span class="p">,</span> |
| <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">mutations</span><span class="p">),</span> |
| <span class="n">latency_ms</span><span class="p">)</span> |
| |
| <span class="n">now</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">*</span> <span class="mi">1000</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch_sizer</span><span class="o">.</span><span class="n">report_latency</span><span class="p">(</span> |
| <span class="n">now</span><span class="p">,</span> <span class="n">latency_ms</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">mutations</span><span class="p">))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_target_batch_size</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch_sizer</span><span class="o">.</span><span class="n">get_batch_size</span><span class="p">(</span><span class="n">now</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_init_batch</span><span class="p">()</span> |
| |
| |
| <div class="viewcode-block" id="WriteToDatastore"><a class="viewcode-back" href="../../../../../../apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.WriteToDatastore">[docs]</a><span class="nd">@typehints</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Entity</span><span class="p">)</span> |
| <span class="k">class</span> <span class="nc">WriteToDatastore</span><span class="p">(</span><span class="n">_Mutate</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Writes elements of type</span> |
| <span class="sd"> :class:`~apache_beam.io.gcp.datastore.v1new.types.Entity` to Cloud Datastore.</span> |
| |
| <span class="sd"> Entity keys must be complete. The ``project`` field in each key must match the</span> |
| <span class="sd"> project ID passed to this transform. If ``project`` field in entity or</span> |
| <span class="sd"> property key is empty then it is filled with the project ID passed to this</span> |
| <span class="sd"> transform.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">project</span><span class="p">,</span> |
| <span class="n">throttle_rampup</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">hint_num_workers</span><span class="o">=</span><span class="n">_Mutate</span><span class="o">.</span><span class="n">_DEFAULT_HINT_NUM_WORKERS</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Initialize the `WriteToDatastore` transform.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> project: (:class:`str`) The ID of the project to write entities to.</span> |
| <span class="sd"> throttle_rampup: Whether to enforce a gradual ramp-up.</span> |
| <span class="sd"> hint_num_workers: A hint for the expected number of workers, used to</span> |
| <span class="sd"> estimate appropriate limits during ramp-up throttling.</span> |
| <span class="sd"> """</span> |
| <span class="n">mutate_fn</span> <span class="o">=</span> <span class="n">WriteToDatastore</span><span class="o">.</span><span class="n">_DatastoreWriteFn</span><span class="p">(</span><span class="n">project</span><span class="p">)</span> |
| <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">mutate_fn</span><span class="p">,</span> <span class="n">throttle_rampup</span><span class="p">,</span> <span class="n">hint_num_workers</span><span class="p">)</span> |
| |
| <span class="k">class</span> <span class="nc">_DatastoreWriteFn</span><span class="p">(</span><span class="n">_Mutate</span><span class="o">.</span><span class="n">DatastoreMutateFn</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">element_to_client_batch_item</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">types</span><span class="o">.</span><span class="n">Entity</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'apache_beam.io.gcp.datastore.v1new.datastoreio.Entity'</span> |
| <span class="s1">' expected, got: </span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="nb">type</span><span class="p">(</span><span class="n">element</span><span class="p">))</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">element</span><span class="o">.</span><span class="n">key</span><span class="o">.</span><span class="n">project</span><span class="p">:</span> |
| <span class="n">element</span><span class="o">.</span><span class="n">key</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_project</span> |
| <span class="n">client_entity</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">to_client_entity</span><span class="p">()</span> |
| <span class="k">if</span> <span class="n">client_entity</span><span class="o">.</span><span class="n">key</span><span class="o">.</span><span class="n">is_partial</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'Entities to be written to Cloud Datastore must '</span> |
| <span class="s1">'have complete keys:</span><span class="se">\n</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="n">client_entity</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">client_entity</span> |
| |
| <span class="k">def</span> <span class="nf">add_to_batch</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">client_entity</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">put</span><span class="p">(</span><span class="n">client_entity</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="s1">'mutation'</span><span class="p">:</span> <span class="s1">'Write (upsert)'</span><span class="p">,</span> |
| <span class="s1">'project'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_project</span><span class="p">,</span> |
| <span class="p">}</span></div> |
| |
| |
| <div class="viewcode-block" id="DeleteFromDatastore"><a class="viewcode-back" href="../../../../../../apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.DeleteFromDatastore">[docs]</a><span class="nd">@typehints</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Key</span><span class="p">)</span> |
| <span class="k">class</span> <span class="nc">DeleteFromDatastore</span><span class="p">(</span><span class="n">_Mutate</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Deletes elements matching input</span> |
| <span class="sd"> :class:`~apache_beam.io.gcp.datastore.v1new.types.Key` elements from Cloud</span> |
| <span class="sd"> Datastore.</span> |
| |
| <span class="sd"> Keys must be complete. The ``project`` field in each key must match the</span> |
| <span class="sd"> project ID passed to this transform. If ``project`` field in key is empty then</span> |
| <span class="sd"> it is filled with the project ID passed to this transform.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">project</span><span class="p">,</span> |
| <span class="n">throttle_rampup</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">hint_num_workers</span><span class="o">=</span><span class="n">_Mutate</span><span class="o">.</span><span class="n">_DEFAULT_HINT_NUM_WORKERS</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Initialize the `DeleteFromDatastore` transform.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> project: (:class:`str`) The ID of the project from which the entities will</span> |
| <span class="sd"> be deleted.</span> |
| <span class="sd"> throttle_rampup: Whether to enforce a gradual ramp-up.</span> |
| <span class="sd"> hint_num_workers: A hint for the expected number of workers, used to</span> |
| <span class="sd"> estimate appropriate limits during ramp-up throttling.</span> |
| <span class="sd"> """</span> |
| <span class="n">mutate_fn</span> <span class="o">=</span> <span class="n">DeleteFromDatastore</span><span class="o">.</span><span class="n">_DatastoreDeleteFn</span><span class="p">(</span><span class="n">project</span><span class="p">)</span> |
| <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">mutate_fn</span><span class="p">,</span> <span class="n">throttle_rampup</span><span class="p">,</span> <span class="n">hint_num_workers</span><span class="p">)</span> |
| |
| <span class="k">class</span> <span class="nc">_DatastoreDeleteFn</span><span class="p">(</span><span class="n">_Mutate</span><span class="o">.</span><span class="n">DatastoreMutateFn</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">element_to_client_batch_item</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">types</span><span class="o">.</span><span class="n">Key</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'apache_beam.io.gcp.datastore.v1new.datastoreio.Key'</span> |
| <span class="s1">' expected, got: </span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="nb">type</span><span class="p">(</span><span class="n">element</span><span class="p">))</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">element</span><span class="o">.</span><span class="n">project</span><span class="p">:</span> |
| <span class="n">element</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_project</span> |
| <span class="n">client_key</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">to_client_key</span><span class="p">()</span> |
| <span class="k">if</span> <span class="n">client_key</span><span class="o">.</span><span class="n">is_partial</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'Keys to be deleted from Cloud Datastore must be '</span> |
| <span class="s1">'complete:</span><span class="se">\n</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="n">client_key</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">client_key</span> |
| |
| <span class="k">def</span> <span class="nf">add_to_batch</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">client_key</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">delete</span><span class="p">(</span><span class="n">client_key</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="s1">'mutation'</span><span class="p">:</span> <span class="s1">'Delete'</span><span class="p">,</span> |
| <span class="s1">'project'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_project</span><span class="p">,</span> |
| <span class="p">}</span></div> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| © Copyright |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |