blob: ac56136aa9ca76ebda86946e95330049517ba0df [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.io.gcp.datastore.v1new.datastoreio &mdash; Apache Beam 2.47.0 documentation</title>
<script type="text/javascript" src="../../../../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../../../../" src="../../../../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../../../../genindex.html" />
<link rel="search" title="Search" href="../../../../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.47.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../../../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.io.gcp.datastore.v1new.datastoreio</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.io.gcp.datastore.v1new.datastoreio</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd">A connector for reading from and writing to Google Cloud Datastore.</span>
<span class="sd">This module uses the newer google-cloud-datastore client package. Its API was</span>
<span class="sd">different enough to require extensive changes to this and associated modules.</span>
<span class="sd">**Updates to the I/O connector code**</span>
<span class="sd">For any significant updates to this I/O connector, please consider involving</span>
<span class="sd">corresponding code reviewers mentioned in</span>
<span class="sd">https://github.com/apache/beam/blob/master/sdks/python/OWNERS</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="c1"># pytype: skip-file</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">time</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">typehints</span>
<span class="kn">from</span> <span class="nn">apache_beam.internal.metrics.metric</span> <span class="kn">import</span> <span class="n">ServiceCallMetric</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp</span> <span class="kn">import</span> <span class="n">resource_identifiers</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.datastore.v1new</span> <span class="kn">import</span> <span class="n">helper</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.datastore.v1new</span> <span class="kn">import</span> <span class="n">query_splitter</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.datastore.v1new</span> <span class="kn">import</span> <span class="n">types</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.datastore.v1new</span> <span class="kn">import</span> <span class="n">util</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.datastore.v1new.adaptive_throttler</span> <span class="kn">import</span> <span class="n">AdaptiveThrottler</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.datastore.v1new.rampup_throttling_fn</span> <span class="kn">import</span> <span class="n">RampupThrottlingFn</span>
<span class="kn">from</span> <span class="nn">apache_beam.metrics</span> <span class="kn">import</span> <span class="n">monitoring_infos</span>
<span class="kn">from</span> <span class="nn">apache_beam.metrics.metric</span> <span class="kn">import</span> <span class="n">Metrics</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">Create</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">DoFn</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">ParDo</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">PTransform</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">Reshuffle</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils</span> <span class="kn">import</span> <span class="n">retry</span>
<span class="c1"># Protect against environments where datastore library is not available.</span>
<span class="c1"># pylint: disable=wrong-import-order, wrong-import-position</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">apitools.base.py.exceptions</span> <span class="kn">import</span> <span class="n">HttpError</span>
<span class="kn">from</span> <span class="nn">google.api_core.exceptions</span> <span class="kn">import</span> <span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="k">pass</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;ReadFromDatastore&#39;</span><span class="p">,</span> <span class="s1">&#39;WriteToDatastore&#39;</span><span class="p">,</span> <span class="s1">&#39;DeleteFromDatastore&#39;</span><span class="p">]</span>
<span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>
<div class="viewcode-block" id="ReadFromDatastore"><a class="viewcode-back" href="../../../../../../apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore">[docs]</a><span class="nd">@typehints</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Entity</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">ReadFromDatastore</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A ``PTransform`` for querying Google Cloud Datastore.</span>
<span class="sd"> To read a ``PCollection[Entity]`` from a Cloud Datastore ``Query``, use</span>
<span class="sd"> the ``ReadFromDatastore`` transform by providing a `query` to</span>
<span class="sd"> read from. The project and optional namespace are set in the query.</span>
<span class="sd"> The query will be split into multiple queries to allow for parallelism. The</span>
<span class="sd"> degree of parallelism is automatically determined, but can be overridden by</span>
<span class="sd"> setting `num_splits` to a value of 1 or greater.</span>
<span class="sd"> Note: Normally, a runner will read from Cloud Datastore in parallel across</span>
<span class="sd"> many workers. However, when the `query` is configured with a `limit` or if the</span>
<span class="sd"> query contains inequality filters like `GREATER_THAN, LESS_THAN` etc., then</span>
<span class="sd"> all the returned results will be read by a single worker in order to ensure</span>
<span class="sd"> correct data. Since data is read from a single worker, this could have</span>
<span class="sd"> significant impact on the performance of the job. Using a</span>
<span class="sd"> :class:`~apache_beam.transforms.util.Reshuffle` transform after the read in</span>
<span class="sd"> this case might be beneficial for parallelizing work across workers.</span>
<span class="sd"> The semantics for query splitting is defined below:</span>
<span class="sd"> 1. If `num_splits` is equal to 0, then the number of splits will be chosen</span>
<span class="sd"> dynamically at runtime based on the query data size.</span>
<span class="sd"> 2. Any value of `num_splits` greater than</span>
<span class="sd"> `ReadFromDatastore._NUM_QUERY_SPLITS_MAX` will be capped at that value.</span>
<span class="sd"> 3. If the `query` has a user limit set, or contains inequality filters, then</span>
<span class="sd"> `num_splits` will be ignored and no split will be performed.</span>
<span class="sd"> 4. Under certain cases Cloud Datastore is unable to split query to the</span>
<span class="sd"> requested number of splits. In such cases we just use whatever Cloud</span>
<span class="sd"> Datastore returns.</span>
<span class="sd"> See https://developers.google.com/datastore/ for more details on Google Cloud</span>
<span class="sd"> Datastore.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># An upper bound on the number of splits for a query.</span>
<span class="n">_NUM_QUERY_SPLITS_MAX</span> <span class="o">=</span> <span class="mi">50000</span>
<span class="c1"># A lower bound on the number of splits for a query. This is to ensure that</span>
<span class="c1"># we parallelize the query even when Datastore statistics are not available.</span>
<span class="n">_NUM_QUERY_SPLITS_MIN</span> <span class="o">=</span> <span class="mi">12</span>
<span class="c1"># Default bundle size of 64MB.</span>
<span class="n">_DEFAULT_BUNDLE_SIZE_BYTES</span> <span class="o">=</span> <span class="mi">64</span> <span class="o">*</span> <span class="mi">1024</span> <span class="o">*</span> <span class="mi">1024</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">query</span><span class="p">,</span> <span class="n">num_splits</span><span class="o">=</span><span class="mi">0</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Initialize the `ReadFromDatastore` transform.</span>
<span class="sd"> This transform outputs elements of type</span>
<span class="sd"> :class:`~apache_beam.io.gcp.datastore.v1new.types.Entity`.</span>
<span class="sd"> Args:</span>
<span class="sd"> query: (:class:`~apache_beam.io.gcp.datastore.v1new.types.Query`) query</span>
<span class="sd"> used to fetch entities.</span>
<span class="sd"> num_splits: (:class:`int`) (optional) Number of splits for the query.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">query</span><span class="o">.</span><span class="n">project</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;query.project cannot be empty&quot;</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">query</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;query cannot be empty&quot;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">num_splits</span> <span class="o">&lt;</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;num_splits must be greater than or equal 0&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_project</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">project</span>
<span class="c1"># using _namespace conflicts with DisplayData._namespace</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_datastore_namespace</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">namespace</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_query</span> <span class="o">=</span> <span class="n">query</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span> <span class="o">=</span> <span class="n">num_splits</span>
<div class="viewcode-block" id="ReadFromDatastore.expand"><a class="viewcode-back" href="../../../../../../apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="c1"># This is a composite transform involves the following:</span>
<span class="c1"># 1. Create a singleton of the user provided `query` and apply a ``ParDo``</span>
<span class="c1"># that splits the query into `num_splits` queries if possible.</span>
<span class="c1">#</span>
<span class="c1"># If the value of `num_splits` is 0, the number of splits will be</span>
<span class="c1"># computed dynamically based on the size of the data for the `query`.</span>
<span class="c1">#</span>
<span class="c1"># 2. The resulting ``PCollection`` is sharded across workers using a</span>
<span class="c1"># ``Reshuffle`` operation.</span>
<span class="c1">#</span>
<span class="c1"># 3. In the third step, a ``ParDo`` reads entities for each query and</span>
<span class="c1"># outputs a ``PCollection[Entity]``.</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span>
<span class="o">|</span> <span class="s1">&#39;UserQuery&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">Create</span><span class="p">([</span><span class="bp">self</span><span class="o">.</span><span class="n">_query</span><span class="p">])</span>
<span class="o">|</span> <span class="s1">&#39;SplitQuery&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ParDo</span><span class="p">(</span>
<span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_SplitQueryFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span><span class="p">))</span>
<span class="o">|</span> <span class="n">Reshuffle</span><span class="p">()</span>
<span class="o">|</span> <span class="s1">&#39;Read&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ParDo</span><span class="p">(</span><span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_QueryFn</span><span class="p">()))</span></div>
<div class="viewcode-block" id="ReadFromDatastore.display_data"><a class="viewcode-back" href="../../../../../../apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">disp_data</span> <span class="o">=</span> <span class="p">{</span>
<span class="s1">&#39;project&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_query</span><span class="o">.</span><span class="n">project</span><span class="p">,</span>
<span class="s1">&#39;query&#39;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_query</span><span class="p">),</span>
<span class="s1">&#39;num_splits&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span>
<span class="p">}</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_datastore_namespace</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">disp_data</span><span class="p">[</span><span class="s1">&#39;namespace&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_datastore_namespace</span>
<span class="k">return</span> <span class="n">disp_data</span></div>
<span class="nd">@typehints</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Query</span><span class="p">)</span>
<span class="nd">@typehints</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Query</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_SplitQueryFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A `DoFn` that splits a given query into multiple sub-queries.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">num_splits</span><span class="p">):</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span> <span class="o">=</span> <span class="n">num_splits</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">query</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="n">client</span> <span class="o">=</span> <span class="n">helper</span><span class="o">.</span><span class="n">get_client</span><span class="p">(</span><span class="n">query</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> <span class="n">query</span><span class="o">.</span><span class="n">namespace</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># Short circuit estimating num_splits if split is not possible.</span>
<span class="n">query_splitter</span><span class="o">.</span><span class="n">validate_split</span><span class="p">(</span><span class="n">query</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="n">estimated_num_splits</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_estimated_num_splits</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">query</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">estimated_num_splits</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Splitting the query into </span><span class="si">%d</span><span class="s2"> splits&quot;</span><span class="p">,</span> <span class="n">estimated_num_splits</span><span class="p">)</span>
<span class="n">query_splits</span> <span class="o">=</span> <span class="n">query_splitter</span><span class="o">.</span><span class="n">get_splits</span><span class="p">(</span>
<span class="n">client</span><span class="p">,</span> <span class="n">query</span><span class="p">,</span> <span class="n">estimated_num_splits</span><span class="p">)</span>
<span class="k">except</span> <span class="n">query_splitter</span><span class="o">.</span><span class="n">QuerySplitterError</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s2">&quot;Unable to parallelize the given query: </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">query</span><span class="p">,</span> <span class="n">exc_info</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="n">query_splits</span> <span class="o">=</span> <span class="p">[</span><span class="n">query</span><span class="p">]</span>
<span class="k">return</span> <span class="n">query_splits</span>
<span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">disp_data</span> <span class="o">=</span> <span class="p">{</span><span class="s1">&#39;num_splits&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_num_splits</span><span class="p">}</span>
<span class="k">return</span> <span class="n">disp_data</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">query_latest_statistics_timestamp</span><span class="p">(</span><span class="n">client</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Fetches the latest timestamp of statistics from Cloud Datastore.</span>
<span class="sd"> Cloud Datastore system tables with statistics are periodically updated.</span>
<span class="sd"> This method fetches the latest timestamp (in microseconds) of statistics</span>
<span class="sd"> update using the `__Stat_Total__` table.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">client</span><span class="o">.</span><span class="n">namespace</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">kind</span> <span class="o">=</span> <span class="s1">&#39;__Stat_Total__&#39;</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">kind</span> <span class="o">=</span> <span class="s1">&#39;__Stat_Ns_Total__&#39;</span>
<span class="n">query</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">query</span><span class="p">(</span>
<span class="n">kind</span><span class="o">=</span><span class="n">kind</span><span class="p">,</span> <span class="n">order</span><span class="o">=</span><span class="p">[</span>
<span class="s2">&quot;-timestamp&quot;</span><span class="p">,</span>
<span class="p">])</span>
<span class="n">entities</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">query</span><span class="o">.</span><span class="n">fetch</span><span class="p">(</span><span class="n">limit</span><span class="o">=</span><span class="mi">1</span><span class="p">))</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">entities</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span><span class="s2">&quot;Datastore total statistics unavailable.&quot;</span><span class="p">)</span>
<span class="k">return</span> <span class="n">entities</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="s1">&#39;timestamp&#39;</span><span class="p">]</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">get_estimated_size_bytes</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">query</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Get the estimated size of the data returned by this instance&#39;s query.</span>
<span class="sd"> Cloud Datastore provides no way to get a good estimate of how large the</span>
<span class="sd"> result of a query is going to be. Hence we use the __Stat_Kind__ system</span>
<span class="sd"> table to get size of the entire kind as an approximate estimate, assuming</span>
<span class="sd"> exactly 1 kind is specified in the query.</span>
<span class="sd"> See https://cloud.google.com/datastore/docs/concepts/stats.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">kind_name</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">kind</span>
<span class="n">latest_timestamp</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_SplitQueryFn</span><span class="o">.</span><span class="n">query_latest_statistics_timestamp</span><span class="p">(</span>
<span class="n">client</span><span class="p">))</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;Latest stats timestamp for kind </span><span class="si">%s</span><span class="s1"> is </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">kind_name</span><span class="p">,</span>
<span class="n">latest_timestamp</span><span class="p">)</span>
<span class="k">if</span> <span class="n">client</span><span class="o">.</span><span class="n">namespace</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">kind</span> <span class="o">=</span> <span class="s1">&#39;__Stat_Kind__&#39;</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">kind</span> <span class="o">=</span> <span class="s1">&#39;__Stat_Ns_Kind__&#39;</span>
<span class="n">query</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">kind</span><span class="o">=</span><span class="n">kind</span><span class="p">)</span>
<span class="n">query</span><span class="o">.</span><span class="n">add_filter</span><span class="p">(</span><span class="s1">&#39;kind_name&#39;</span><span class="p">,</span> <span class="s1">&#39;=&#39;</span><span class="p">,</span> <span class="n">kind_name</span><span class="p">)</span>
<span class="n">query</span><span class="o">.</span><span class="n">add_filter</span><span class="p">(</span><span class="s1">&#39;timestamp&#39;</span><span class="p">,</span> <span class="s1">&#39;=&#39;</span><span class="p">,</span> <span class="n">latest_timestamp</span><span class="p">)</span>
<span class="n">entities</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">query</span><span class="o">.</span><span class="n">fetch</span><span class="p">(</span><span class="n">limit</span><span class="o">=</span><span class="mi">1</span><span class="p">))</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">entities</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span>
<span class="s1">&#39;Datastore statistics for kind </span><span class="si">%s</span><span class="s1"> unavailable&#39;</span> <span class="o">%</span> <span class="n">kind_name</span><span class="p">)</span>
<span class="k">return</span> <span class="n">entities</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="s1">&#39;entity_bytes&#39;</span><span class="p">]</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">get_estimated_num_splits</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">query</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Computes the number of splits to be performed on the query.&quot;&quot;&quot;</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">estimated_size_bytes</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_SplitQueryFn</span><span class="o">.</span><span class="n">get_estimated_size_bytes</span><span class="p">(</span>
<span class="n">client</span><span class="p">,</span> <span class="n">query</span><span class="p">))</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Estimated size bytes for query: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">estimated_size_bytes</span><span class="p">)</span>
<span class="n">num_splits</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span>
<span class="nb">min</span><span class="p">(</span>
<span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_NUM_QUERY_SPLITS_MAX</span><span class="p">,</span>
<span class="nb">round</span><span class="p">((</span>
<span class="nb">float</span><span class="p">(</span><span class="n">estimated_size_bytes</span><span class="p">)</span> <span class="o">/</span>
<span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_DEFAULT_BUNDLE_SIZE_BYTES</span><span class="p">))))</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s1">&#39;Failed to fetch estimated size bytes: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">e</span><span class="p">)</span>
<span class="c1"># Fallback in case estimated size is unavailable.</span>
<span class="n">num_splits</span> <span class="o">=</span> <span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_NUM_QUERY_SPLITS_MIN</span>
<span class="k">return</span> <span class="nb">max</span><span class="p">(</span><span class="n">num_splits</span><span class="p">,</span> <span class="n">ReadFromDatastore</span><span class="o">.</span><span class="n">_NUM_QUERY_SPLITS_MIN</span><span class="p">)</span>
<span class="nd">@typehints</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Query</span><span class="p">)</span>
<span class="nd">@typehints</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Entity</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_QueryFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A DoFn that fetches entities from Cloud Datastore, for a given query.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">query</span><span class="p">,</span> <span class="o">*</span><span class="n">unused_args</span><span class="p">,</span> <span class="o">**</span><span class="n">unused_kwargs</span><span class="p">):</span>
<span class="k">if</span> <span class="n">query</span><span class="o">.</span><span class="n">namespace</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">query</span><span class="o">.</span><span class="n">namespace</span> <span class="o">=</span> <span class="s1">&#39;&#39;</span>
<span class="n">_client</span> <span class="o">=</span> <span class="n">helper</span><span class="o">.</span><span class="n">get_client</span><span class="p">(</span><span class="n">query</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> <span class="n">query</span><span class="o">.</span><span class="n">namespace</span><span class="p">)</span>
<span class="n">client_query</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">_to_client_query</span><span class="p">(</span><span class="n">_client</span><span class="p">)</span>
<span class="c1"># Create request count metric</span>
<span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">DatastoreNamespace</span><span class="p">(</span>
<span class="n">query</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> <span class="n">query</span><span class="o">.</span><span class="n">namespace</span><span class="p">)</span>
<span class="n">labels</span> <span class="o">=</span> <span class="p">{</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SERVICE_LABEL</span><span class="p">:</span> <span class="s1">&#39;Datastore&#39;</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">METHOD_LABEL</span><span class="p">:</span> <span class="s1">&#39;BatchDatastoreRead&#39;</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">DATASTORE_NAMESPACE_LABEL</span><span class="p">:</span> <span class="n">query</span><span class="o">.</span><span class="n">namespace</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">DATASTORE_PROJECT_ID_LABEL</span><span class="p">:</span> <span class="n">query</span><span class="o">.</span><span class="n">project</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">STATUS_LABEL</span><span class="p">:</span> <span class="s1">&#39;ok&#39;</span>
<span class="p">}</span>
<span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span>
<span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span>
<span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">for</span> <span class="n">client_entity</span> <span class="ow">in</span> <span class="n">client_query</span><span class="o">.</span><span class="n">fetch</span><span class="p">(</span><span class="n">query</span><span class="o">.</span><span class="n">limit</span><span class="p">):</span>
<span class="k">yield</span> <span class="n">types</span><span class="o">.</span><span class="n">Entity</span><span class="o">.</span><span class="n">from_client_entity</span><span class="p">(</span><span class="n">client_entity</span><span class="p">)</span>
<span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="s1">&#39;ok&#39;</span><span class="p">)</span>
<span class="k">except</span> <span class="p">(</span><span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span><span class="p">)</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="c1"># e.code.value contains the numeric http status code.</span>
<span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">code</span><span class="o">.</span><span class="n">value</span><span class="p">)</span>
<span class="k">raise</span>
<span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
<span class="k">raise</span></div>
<span class="k">class</span> <span class="nc">_Mutate</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A ``PTransform`` that writes mutations to Cloud Datastore.</span>
<span class="sd"> Only idempotent Datastore mutation operations (upsert and delete) are</span>
<span class="sd"> supported, as the commits are retried when failures occur.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Default hint for the expected number of workers in the ramp-up throttling</span>
<span class="c1"># step for write or delete operations.</span>
<span class="n">_DEFAULT_HINT_NUM_WORKERS</span> <span class="o">=</span> <span class="mi">500</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">mutate_fn</span><span class="p">,</span>
<span class="n">throttle_rampup</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">hint_num_workers</span><span class="o">=</span><span class="n">_DEFAULT_HINT_NUM_WORKERS</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Initializes a Mutate transform.</span>
<span class="sd"> Args:</span>
<span class="sd"> mutate_fn: Instance of `DatastoreMutateFn` to use.</span>
<span class="sd"> throttle_rampup: Whether to enforce a gradual ramp-up.</span>
<span class="sd"> hint_num_workers: A hint for the expected number of workers, used to</span>
<span class="sd"> estimate appropriate limits during ramp-up throttling.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_mutate_fn</span> <span class="o">=</span> <span class="n">mutate_fn</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_throttle_rampup</span> <span class="o">=</span> <span class="n">throttle_rampup</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_hint_num_workers</span> <span class="o">=</span> <span class="n">hint_num_workers</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_throttle_rampup</span><span class="p">:</span>
<span class="n">throttling_fn</span> <span class="o">=</span> <span class="n">RampupThrottlingFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_hint_num_workers</span><span class="p">)</span>
<span class="n">pcoll</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">pcoll</span> <span class="o">|</span> <span class="s1">&#39;Enforce throttling during ramp-up&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ParDo</span><span class="p">(</span><span class="n">throttling_fn</span><span class="p">))</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="s1">&#39;Write Batch to Datastore&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ParDo</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_mutate_fn</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">DatastoreMutateFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A ``DoFn`` that write mutations to Datastore.</span>
<span class="sd"> Mutations are written in batches, where the maximum batch size is</span>
<span class="sd"> `util.WRITE_BATCH_SIZE`.</span>
<span class="sd"> Commits are non-transactional. If a commit fails because of a conflict over</span>
<span class="sd"> an entity group, the commit will be retried. This means that the mutation</span>
<span class="sd"> should be idempotent (`upsert` and `delete` mutations) to prevent duplicate</span>
<span class="sd"> data or errors.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">project</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Args:</span>
<span class="sd"> project: (str) cloud project id</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_project</span> <span class="o">=</span> <span class="n">project</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_client</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_rpc_successes</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span>
<span class="n">_Mutate</span><span class="o">.</span><span class="n">DatastoreMutateFn</span><span class="p">,</span> <span class="s2">&quot;datastoreRpcSuccesses&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_rpc_errors</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span>
<span class="n">_Mutate</span><span class="o">.</span><span class="n">DatastoreMutateFn</span><span class="p">,</span> <span class="s2">&quot;datastoreRpcErrors&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_throttled_secs</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span>
<span class="n">_Mutate</span><span class="o">.</span><span class="n">DatastoreMutateFn</span><span class="p">,</span> <span class="s2">&quot;cumulativeThrottlingSeconds&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span> <span class="o">=</span> <span class="n">AdaptiveThrottler</span><span class="p">(</span>
<span class="n">window_ms</span><span class="o">=</span><span class="mi">120000</span><span class="p">,</span> <span class="n">bucket_ms</span><span class="o">=</span><span class="mi">1000</span><span class="p">,</span> <span class="n">overload_ratio</span><span class="o">=</span><span class="mf">1.25</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_update_rpc_stats</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">successes</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> <span class="n">errors</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> <span class="n">throttled_secs</span><span class="o">=</span><span class="mi">0</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_rpc_successes</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="n">successes</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_rpc_errors</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="n">errors</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_throttled_secs</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="n">throttled_secs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_client</span> <span class="o">=</span> <span class="n">helper</span><span class="o">.</span><span class="n">get_client</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_project</span><span class="p">,</span> <span class="n">namespace</span><span class="o">=</span><span class="kc">None</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_init_batch</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch_sizer</span> <span class="o">=</span> <span class="n">util</span><span class="o">.</span><span class="n">DynamicBatchSizer</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_target_batch_size</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch_sizer</span><span class="o">.</span><span class="n">get_batch_size</span><span class="p">(</span>
<span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">element_to_client_batch_item</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span>
<span class="k">def</span> <span class="nf">add_to_batch</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">client_batch_item</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span>
<span class="nd">@retry</span><span class="o">.</span><span class="n">with_exponential_backoff</span><span class="p">(</span>
<span class="n">num_retries</span><span class="o">=</span><span class="mi">5</span><span class="p">,</span> <span class="n">retry_filter</span><span class="o">=</span><span class="n">helper</span><span class="o">.</span><span class="n">retry_on_rpc_error</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">write_mutations</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">throttler</span><span class="p">,</span> <span class="n">rpc_stats_callback</span><span class="p">,</span> <span class="n">throttle_delay</span><span class="o">=</span><span class="mi">1</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Writes a batch of mutations to Cloud Datastore.</span>
<span class="sd"> If a commit fails, it will be retried up to 5 times. All mutations in the</span>
<span class="sd"> batch will be committed again, even if the commit was partially</span>
<span class="sd"> successful. If the retry limit is exceeded, the last exception from</span>
<span class="sd"> Cloud Datastore will be raised.</span>
<span class="sd"> Assumes that the Datastore client library does not perform any retries on</span>
<span class="sd"> commits. It has not been determined how such retries would interact with</span>
<span class="sd"> the retries and throttler used here.</span>
<span class="sd"> See ``google.cloud.datastore_v1.gapic.datastore_client_config`` for</span>
<span class="sd"> retry config.</span>
<span class="sd"> Args:</span>
<span class="sd"> rpc_stats_callback: a function to call with arguments `successes` and</span>
<span class="sd"> `failures` and `throttled_secs`; this is called to record successful</span>
<span class="sd"> and failed RPCs to Datastore and time spent waiting for throttling.</span>
<span class="sd"> throttler: (``apache_beam.io.gcp.datastore.v1new.adaptive_throttler.</span>
<span class="sd"> AdaptiveThrottler``)</span>
<span class="sd"> Throttler instance used to select requests to be throttled.</span>
<span class="sd"> throttle_delay: (:class:`float`) time in seconds to sleep when</span>
<span class="sd"> throttled.</span>
<span class="sd"> Returns:</span>
<span class="sd"> (int) The latency of the successful RPC in milliseconds.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Client-side throttling.</span>
<span class="k">while</span> <span class="n">throttler</span><span class="o">.</span><span class="n">throttle_request</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">):</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s2">&quot;Delaying request for </span><span class="si">%d</span><span class="s2">s due to previous failures&quot;</span><span class="p">,</span> <span class="n">throttle_delay</span><span class="p">)</span>
<span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="n">throttle_delay</span><span class="p">)</span>
<span class="n">rpc_stats_callback</span><span class="p">(</span><span class="n">throttled_secs</span><span class="o">=</span><span class="n">throttle_delay</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># this will only happen when we re-try previously failed batch</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_client</span><span class="o">.</span><span class="n">batch</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">begin</span><span class="p">()</span>
<span class="k">for</span> <span class="n">element</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch_elements</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">add_to_batch</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="c1"># Create request count metric</span>
<span class="n">resource</span> <span class="o">=</span> <span class="n">resource_identifiers</span><span class="o">.</span><span class="n">DatastoreNamespace</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_project</span><span class="p">,</span> <span class="s2">&quot;&quot;</span><span class="p">)</span>
<span class="n">labels</span> <span class="o">=</span> <span class="p">{</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">SERVICE_LABEL</span><span class="p">:</span> <span class="s1">&#39;Datastore&#39;</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">METHOD_LABEL</span><span class="p">:</span> <span class="s1">&#39;BatchDatastoreWrite&#39;</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">RESOURCE_LABEL</span><span class="p">:</span> <span class="n">resource</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">DATASTORE_NAMESPACE_LABEL</span><span class="p">:</span> <span class="s2">&quot;&quot;</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">DATASTORE_PROJECT_ID_LABEL</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_project</span><span class="p">,</span>
<span class="n">monitoring_infos</span><span class="o">.</span><span class="n">STATUS_LABEL</span><span class="p">:</span> <span class="s1">&#39;ok&#39;</span>
<span class="p">}</span>
<span class="n">service_call_metric</span> <span class="o">=</span> <span class="n">ServiceCallMetric</span><span class="p">(</span>
<span class="n">request_count_urn</span><span class="o">=</span><span class="n">monitoring_infos</span><span class="o">.</span><span class="n">API_REQUEST_COUNT_URN</span><span class="p">,</span>
<span class="n">base_labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">start_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="n">end_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span>
<span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="s1">&#39;ok&#39;</span><span class="p">)</span>
<span class="n">rpc_stats_callback</span><span class="p">(</span><span class="n">successes</span><span class="o">=</span><span class="mi">1</span><span class="p">)</span>
<span class="n">throttler</span><span class="o">.</span><span class="n">successful_request</span><span class="p">(</span><span class="n">start_time</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">)</span>
<span class="n">commit_time_ms</span> <span class="o">=</span> <span class="nb">int</span><span class="p">((</span><span class="n">end_time</span> <span class="o">-</span> <span class="n">start_time</span><span class="p">)</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">)</span>
<span class="k">return</span> <span class="n">commit_time_ms</span>
<span class="k">except</span> <span class="p">(</span><span class="n">ClientError</span><span class="p">,</span> <span class="n">GoogleAPICallError</span><span class="p">)</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="o">=</span> <span class="kc">None</span>
<span class="c1"># e.code.value contains the numeric http status code.</span>
<span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">code</span><span class="o">.</span><span class="n">value</span><span class="p">)</span>
<span class="n">rpc_stats_callback</span><span class="p">(</span><span class="n">errors</span><span class="o">=</span><span class="mi">1</span><span class="p">)</span>
<span class="k">raise</span>
<span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">service_call_metric</span><span class="o">.</span><span class="n">call</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
<span class="n">rpc_stats_callback</span><span class="p">(</span><span class="n">errors</span><span class="o">=</span><span class="mi">1</span><span class="p">)</span>
<span class="k">raise</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">client_element</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">element_to_client_batch_item</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch_elements</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">client_element</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">add_to_batch</span><span class="p">(</span><span class="n">client_element</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch_bytes_size</span> <span class="o">+=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">mutations</span><span class="p">[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">_pb</span><span class="o">.</span><span class="n">ByteSize</span><span class="p">()</span>
<span class="k">if</span> <span class="p">(</span><span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">mutations</span><span class="p">)</span> <span class="o">&gt;=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_target_batch_size</span> <span class="ow">or</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch_bytes_size</span> <span class="o">&gt;</span> <span class="n">util</span><span class="o">.</span><span class="n">WRITE_BATCH_MAX_BYTES_SIZE</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_flush_batch</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch_elements</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_flush_batch</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">_init_batch</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch_bytes_size</span> <span class="o">=</span> <span class="mi">0</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_client</span><span class="o">.</span><span class="n">batch</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">begin</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch_elements</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">def</span> <span class="nf">_flush_batch</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># Flush the current batch of mutations to Cloud Datastore.</span>
<span class="n">latency_ms</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_mutations</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="p">,</span>
<span class="n">rpc_stats_callback</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_update_rpc_stats</span><span class="p">,</span>
<span class="n">throttle_delay</span><span class="o">=</span><span class="n">util</span><span class="o">.</span><span class="n">WRITE_BATCH_TARGET_LATENCY_MS</span> <span class="o">//</span> <span class="mi">1000</span><span class="p">)</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span>
<span class="s2">&quot;Successfully wrote </span><span class="si">%d</span><span class="s2"> mutations in </span><span class="si">%d</span><span class="s2">ms.&quot;</span><span class="p">,</span>
<span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">mutations</span><span class="p">),</span>
<span class="n">latency_ms</span><span class="p">)</span>
<span class="n">now</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">*</span> <span class="mi">1000</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch_sizer</span><span class="o">.</span><span class="n">report_latency</span><span class="p">(</span>
<span class="n">now</span><span class="p">,</span> <span class="n">latency_ms</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">mutations</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_target_batch_size</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batch_sizer</span><span class="o">.</span><span class="n">get_batch_size</span><span class="p">(</span><span class="n">now</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_init_batch</span><span class="p">()</span>
<div class="viewcode-block" id="WriteToDatastore"><a class="viewcode-back" href="../../../../../../apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.WriteToDatastore">[docs]</a><span class="nd">@typehints</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Entity</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">WriteToDatastore</span><span class="p">(</span><span class="n">_Mutate</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Writes elements of type</span>
<span class="sd"> :class:`~apache_beam.io.gcp.datastore.v1new.types.Entity` to Cloud Datastore.</span>
<span class="sd"> Entity keys must be complete. The ``project`` field in each key must match the</span>
<span class="sd"> project ID passed to this transform. If ``project`` field in entity or</span>
<span class="sd"> property key is empty then it is filled with the project ID passed to this</span>
<span class="sd"> transform.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">project</span><span class="p">,</span>
<span class="n">throttle_rampup</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">hint_num_workers</span><span class="o">=</span><span class="n">_Mutate</span><span class="o">.</span><span class="n">_DEFAULT_HINT_NUM_WORKERS</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Initialize the `WriteToDatastore` transform.</span>
<span class="sd"> Args:</span>
<span class="sd"> project: (:class:`str`) The ID of the project to write entities to.</span>
<span class="sd"> throttle_rampup: Whether to enforce a gradual ramp-up.</span>
<span class="sd"> hint_num_workers: A hint for the expected number of workers, used to</span>
<span class="sd"> estimate appropriate limits during ramp-up throttling.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">mutate_fn</span> <span class="o">=</span> <span class="n">WriteToDatastore</span><span class="o">.</span><span class="n">_DatastoreWriteFn</span><span class="p">(</span><span class="n">project</span><span class="p">)</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">mutate_fn</span><span class="p">,</span> <span class="n">throttle_rampup</span><span class="p">,</span> <span class="n">hint_num_workers</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_DatastoreWriteFn</span><span class="p">(</span><span class="n">_Mutate</span><span class="o">.</span><span class="n">DatastoreMutateFn</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">element_to_client_batch_item</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">types</span><span class="o">.</span><span class="n">Entity</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;apache_beam.io.gcp.datastore.v1new.datastoreio.Entity&#39;</span>
<span class="s1">&#39; expected, got: </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="nb">type</span><span class="p">(</span><span class="n">element</span><span class="p">))</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">element</span><span class="o">.</span><span class="n">key</span><span class="o">.</span><span class="n">project</span><span class="p">:</span>
<span class="n">element</span><span class="o">.</span><span class="n">key</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_project</span>
<span class="n">client_entity</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">to_client_entity</span><span class="p">()</span>
<span class="k">if</span> <span class="n">client_entity</span><span class="o">.</span><span class="n">key</span><span class="o">.</span><span class="n">is_partial</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;Entities to be written to Cloud Datastore must &#39;</span>
<span class="s1">&#39;have complete keys:</span><span class="se">\n</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">client_entity</span><span class="p">)</span>
<span class="k">return</span> <span class="n">client_entity</span>
<span class="k">def</span> <span class="nf">add_to_batch</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">client_entity</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">put</span><span class="p">(</span><span class="n">client_entity</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span>
<span class="s1">&#39;mutation&#39;</span><span class="p">:</span> <span class="s1">&#39;Write (upsert)&#39;</span><span class="p">,</span>
<span class="s1">&#39;project&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_project</span><span class="p">,</span>
<span class="p">}</span></div>
<div class="viewcode-block" id="DeleteFromDatastore"><a class="viewcode-back" href="../../../../../../apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.DeleteFromDatastore">[docs]</a><span class="nd">@typehints</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">types</span><span class="o">.</span><span class="n">Key</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">DeleteFromDatastore</span><span class="p">(</span><span class="n">_Mutate</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Deletes elements matching input</span>
<span class="sd"> :class:`~apache_beam.io.gcp.datastore.v1new.types.Key` elements from Cloud</span>
<span class="sd"> Datastore.</span>
<span class="sd"> Keys must be complete. The ``project`` field in each key must match the</span>
<span class="sd"> project ID passed to this transform. If ``project`` field in key is empty then</span>
<span class="sd"> it is filled with the project ID passed to this transform.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">project</span><span class="p">,</span>
<span class="n">throttle_rampup</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">hint_num_workers</span><span class="o">=</span><span class="n">_Mutate</span><span class="o">.</span><span class="n">_DEFAULT_HINT_NUM_WORKERS</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Initialize the `DeleteFromDatastore` transform.</span>
<span class="sd"> Args:</span>
<span class="sd"> project: (:class:`str`) The ID of the project from which the entities will</span>
<span class="sd"> be deleted.</span>
<span class="sd"> throttle_rampup: Whether to enforce a gradual ramp-up.</span>
<span class="sd"> hint_num_workers: A hint for the expected number of workers, used to</span>
<span class="sd"> estimate appropriate limits during ramp-up throttling.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">mutate_fn</span> <span class="o">=</span> <span class="n">DeleteFromDatastore</span><span class="o">.</span><span class="n">_DatastoreDeleteFn</span><span class="p">(</span><span class="n">project</span><span class="p">)</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">mutate_fn</span><span class="p">,</span> <span class="n">throttle_rampup</span><span class="p">,</span> <span class="n">hint_num_workers</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_DatastoreDeleteFn</span><span class="p">(</span><span class="n">_Mutate</span><span class="o">.</span><span class="n">DatastoreMutateFn</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">element_to_client_batch_item</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">types</span><span class="o">.</span><span class="n">Key</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;apache_beam.io.gcp.datastore.v1new.datastoreio.Key&#39;</span>
<span class="s1">&#39; expected, got: </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="nb">type</span><span class="p">(</span><span class="n">element</span><span class="p">))</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">element</span><span class="o">.</span><span class="n">project</span><span class="p">:</span>
<span class="n">element</span><span class="o">.</span><span class="n">project</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_project</span>
<span class="n">client_key</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">to_client_key</span><span class="p">()</span>
<span class="k">if</span> <span class="n">client_key</span><span class="o">.</span><span class="n">is_partial</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;Keys to be deleted from Cloud Datastore must be &#39;</span>
<span class="s1">&#39;complete:</span><span class="se">\n</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">client_key</span><span class="p">)</span>
<span class="k">return</span> <span class="n">client_key</span>
<span class="k">def</span> <span class="nf">add_to_batch</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">client_key</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batch</span><span class="o">.</span><span class="n">delete</span><span class="p">(</span><span class="n">client_key</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span>
<span class="s1">&#39;mutation&#39;</span><span class="p">:</span> <span class="s1">&#39;Delete&#39;</span><span class="p">,</span>
<span class="s1">&#39;project&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_project</span><span class="p">,</span>
<span class="p">}</span></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>