blob: e6c413c69aa8567504801705b85128453dab6003 [file] [log] [blame]
<!DOCTYPE html>
<html class="writer-html5" lang="en" data-content_root="../../../">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>apache_beam.io.requestresponse &mdash; Apache Beam 2.67.0 documentation</title>
<link rel="stylesheet" type="text/css" href="../../../_static/pygments.css?v=b86133f3" />
<link rel="stylesheet" type="text/css" href="../../../_static/css/theme.css?v=e59714d7" />
<script src="../../../_static/jquery.js?v=5d32c60e"></script>
<script src="../../../_static/_sphinx_javascript_frameworks_compat.js?v=2cd50e6c"></script>
<script src="../../../_static/documentation_options.js?v=959b4fbe"></script>
<script src="../../../_static/doctools.js?v=9a2dae69"></script>
<script src="../../../_static/sphinx_highlight.js?v=dc90522c"></script>
<script src="../../../_static/js/theme.js"></script>
<link rel="index" title="Index" href="../../../genindex.html" />
<link rel="search" title="Search" href="../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../index.html" class="icon icon-home">
Apache Beam
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" aria-label="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div><div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="Navigation menu">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"><nav class="wy-nav-top" aria-label="Mobile navigation menu" >
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="Page navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html" class="icon icon-home" aria-label="Home"></a></li>
<li class="breadcrumb-item"><a href="../../index.html">Module code</a></li>
<li class="breadcrumb-item active">apache_beam.io.requestresponse</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.io.requestresponse</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;``PTransform`` for reading from and writing to Web APIs.&quot;&quot;&quot;</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">abc</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">concurrent.futures</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">contextlib</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">enum</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">json</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">logging</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">sys</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">time</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">datetime</span><span class="w"> </span><span class="kn">import</span> <span class="n">timedelta</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Any</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Dict</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Generic</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">List</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Mapping</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Optional</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Tuple</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">TypeVar</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Union</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">redis</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">google.api_core.exceptions</span><span class="w"> </span><span class="kn">import</span> <span class="n">TooManyRequests</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">apache_beam</span><span class="w"> </span><span class="k">as</span><span class="w"> </span><span class="nn">beam</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam</span><span class="w"> </span><span class="kn">import</span> <span class="n">pvalue</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam.coders</span><span class="w"> </span><span class="kn">import</span> <span class="n">coders</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam.io.components.adaptive_throttler</span><span class="w"> </span><span class="kn">import</span> <span class="n">AdaptiveThrottler</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam.metrics</span><span class="w"> </span><span class="kn">import</span> <span class="n">Metrics</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam.transforms.util</span><span class="w"> </span><span class="kn">import</span> <span class="n">BatchElements</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam.utils</span><span class="w"> </span><span class="kn">import</span> <span class="n">retry</span>
<span class="n">RequestT</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s1">&#39;RequestT&#39;</span><span class="p">)</span>
<span class="n">ResponseT</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s1">&#39;ResponseT&#39;</span><span class="p">)</span>
<span class="c1"># DEFAULT_TIMEOUT_SECS represents the time interval for completing the request</span>
<span class="c1"># with external source.</span>
<span class="n">DEFAULT_TIMEOUT_SECS</span> <span class="o">=</span> <span class="mi">30</span>
<span class="c1"># DEFAULT_CACHE_ENTRY_TTL_SEC represents the total time-to-live</span>
<span class="c1"># for cache record.</span>
<span class="n">DEFAULT_CACHE_ENTRY_TTL_SEC</span> <span class="o">=</span> <span class="mi">24</span> <span class="o">*</span> <span class="mi">60</span> <span class="o">*</span> <span class="mi">60</span>
<span class="n">MSEC_TO_SEC</span> <span class="o">=</span> <span class="mi">1000</span>
<span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span>
<span class="s1">&#39;RequestResponseIO&#39;</span><span class="p">,</span>
<span class="s1">&#39;ExponentialBackOffRepeater&#39;</span><span class="p">,</span>
<span class="s1">&#39;DefaultThrottler&#39;</span><span class="p">,</span>
<span class="s1">&#39;NoOpsRepeater&#39;</span><span class="p">,</span>
<span class="s1">&#39;RedisCache&#39;</span><span class="p">,</span>
<span class="p">]</span>
<span class="k">class</span><span class="w"> </span><span class="nc">UserCodeExecutionException</span><span class="p">(</span><span class="ne">Exception</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Base class for errors related to calling Web APIs.&quot;&quot;&quot;</span>
<span class="k">class</span><span class="w"> </span><span class="nc">UserCodeQuotaException</span><span class="p">(</span><span class="n">UserCodeExecutionException</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Extends ``UserCodeExecutionException`` to signal specifically that</span>
<span class="sd"> the Web API client encountered a Quota or API overuse related error.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">class</span><span class="w"> </span><span class="nc">UserCodeTimeoutException</span><span class="p">(</span><span class="n">UserCodeExecutionException</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Extends ``UserCodeExecutionException`` to signal a user code timeout.&quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">retry_on_exception</span><span class="p">(</span><span class="n">exception</span><span class="p">:</span> <span class="ne">Exception</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;retry on exceptions caused by unavailability of the remote server.&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="nb">isinstance</span><span class="p">(</span>
<span class="n">exception</span><span class="p">,</span>
<span class="p">(</span><span class="n">TooManyRequests</span><span class="p">,</span> <span class="n">UserCodeTimeoutException</span><span class="p">,</span> <span class="n">UserCodeQuotaException</span><span class="p">))</span>
<span class="k">class</span><span class="w"> </span><span class="nc">_MetricsCollector</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A metrics collector that tracks RequestResponseIO related usage.&quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">namespace</span><span class="p">:</span> <span class="nb">str</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Args:</span>
<span class="sd"> namespace: Namespace for the metrics.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">requests</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">&#39;requests&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">responses</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">&#39;responses&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">failures</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">&#39;failures&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">throttled_requests</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">&#39;throttled_requests&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">throttled_secs</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span>
<span class="n">namespace</span><span class="p">,</span> <span class="s1">&#39;cumulativeThrottlingSeconds&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">timeout_requests</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">&#39;requests_timed_out&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">call_counter</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">&#39;call_invocations&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">setup_counter</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">&#39;setup_counter&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">teardown_counter</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">&#39;teardown_counter&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">backoff_counter</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">&#39;backoff_counter&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sleeper_counter</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">&#39;sleeper_counter&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">should_backoff_counter</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span>
<span class="n">namespace</span><span class="p">,</span> <span class="s1">&#39;should_backoff_counter&#39;</span><span class="p">)</span>
<span class="k">class</span><span class="w"> </span><span class="nc">Caller</span><span class="p">(</span><span class="n">contextlib</span><span class="o">.</span><span class="n">AbstractContextManager</span><span class="p">,</span>
<span class="n">abc</span><span class="o">.</span><span class="n">ABC</span><span class="p">,</span>
<span class="n">Generic</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">]):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Interface for user custom code intended for API calls.</span>
<span class="sd"> For setup and teardown of clients when applicable, implement the</span>
<span class="sd"> ``__enter__`` and ``__exit__`` methods respectively.&quot;&quot;&quot;</span>
<span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">ResponseT</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Calls a Web API with the ``RequestT`` and returns a</span>
<span class="sd"> ``ResponseT``. ``RequestResponseIO`` expects implementations of the</span>
<span class="sd"> ``__call__`` method to throw either a ``UserCodeExecutionException``,</span>
<span class="sd"> ``UserCodeQuotaException``, or ``UserCodeTimeoutException``.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exc_type</span><span class="p">,</span> <span class="n">exc_val</span><span class="p">,</span> <span class="n">exc_tb</span><span class="p">):</span>
<span class="k">return</span> <span class="kc">None</span>
<span class="k">def</span><span class="w"> </span><span class="nf">get_cache_key</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">str</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns the request to be cached.</span>
<span class="sd"> This is how the response will be looked up in the cache as well.</span>
<span class="sd"> By default, entire request is cached as the key for the cache.</span>
<span class="sd"> Implement this method to override the key for the cache.</span>
<span class="sd"> For example, in `BigTableEnrichmentHandler`, the row key for the element</span>
<span class="sd"> is returned here.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="s2">&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">batch_elements_kwargs</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Mapping</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns a kwargs suitable for `beam.BatchElements`.&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="p">{}</span>
<span class="k">class</span><span class="w"> </span><span class="nc">ShouldBackOff</span><span class="p">(</span><span class="n">abc</span><span class="o">.</span><span class="n">ABC</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Provides mechanism to apply adaptive throttling.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="k">class</span><span class="w"> </span><span class="nc">Repeater</span><span class="p">(</span><span class="n">abc</span><span class="o">.</span><span class="n">ABC</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Provides mechanism to repeat requests for a</span>
<span class="sd"> configurable condition.&quot;&quot;&quot;</span>
<span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span>
<span class="k">def</span><span class="w"> </span><span class="nf">repeat</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span>
<span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">,</span>
<span class="n">timeout</span><span class="p">:</span> <span class="nb">float</span><span class="p">,</span>
<span class="n">metrics_collector</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">_MetricsCollector</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">ResponseT</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Implements a repeater strategy for RequestResponseIO when a repeater</span>
<span class="sd"> is enabled.</span>
<span class="sd"> Args:</span>
<span class="sd"> caller: a `~apache_beam.io.requestresponse.Caller` object that</span>
<span class="sd"> calls the API.</span>
<span class="sd"> request: input request to repeat.</span>
<span class="sd"> timeout: time to wait for the request to complete.</span>
<span class="sd"> metrics_collector: (Optional) a</span>
<span class="sd"> `~apache_beam.io.requestresponse._MetricsCollector` object</span>
<span class="sd"> to collect the metrics for RequestResponseIO.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="k">def</span><span class="w"> </span><span class="nf">_execute_request</span><span class="p">(</span>
<span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span>
<span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">,</span>
<span class="n">timeout</span><span class="p">:</span> <span class="nb">float</span><span class="p">,</span>
<span class="n">metrics_collector</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">_MetricsCollector</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">ResponseT</span><span class="p">:</span>
<span class="k">with</span> <span class="n">concurrent</span><span class="o">.</span><span class="n">futures</span><span class="o">.</span><span class="n">ThreadPoolExecutor</span><span class="p">()</span> <span class="k">as</span> <span class="n">executor</span><span class="p">:</span>
<span class="n">future</span> <span class="o">=</span> <span class="n">executor</span><span class="o">.</span><span class="n">submit</span><span class="p">(</span><span class="n">caller</span><span class="p">,</span> <span class="n">request</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">return</span> <span class="n">future</span><span class="o">.</span><span class="n">result</span><span class="p">(</span><span class="n">timeout</span><span class="o">=</span><span class="n">timeout</span><span class="p">)</span>
<span class="k">except</span> <span class="n">TooManyRequests</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;request could not be completed. got code </span><span class="si">%i</span><span class="s1"> from the service.&#39;</span><span class="p">,</span>
<span class="n">e</span><span class="o">.</span><span class="n">code</span><span class="p">)</span>
<span class="k">raise</span> <span class="n">e</span>
<span class="k">except</span> <span class="n">concurrent</span><span class="o">.</span><span class="n">futures</span><span class="o">.</span><span class="n">TimeoutError</span><span class="p">:</span>
<span class="k">if</span> <span class="n">metrics_collector</span><span class="p">:</span>
<span class="n">metrics_collector</span><span class="o">.</span><span class="n">timeout_requests</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span>
<span class="k">raise</span> <span class="n">UserCodeTimeoutException</span><span class="p">(</span>
<span class="sa">f</span><span class="s1">&#39;Timeout </span><span class="si">{</span><span class="n">timeout</span><span class="si">}</span><span class="s1"> exceeded &#39;</span>
<span class="sa">f</span><span class="s1">&#39;while completing request: </span><span class="si">{</span><span class="n">request</span><span class="si">}</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">RuntimeError</span><span class="p">:</span>
<span class="k">if</span> <span class="n">metrics_collector</span><span class="p">:</span>
<span class="n">metrics_collector</span><span class="o">.</span><span class="n">failures</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span>
<span class="k">raise</span> <span class="n">UserCodeExecutionException</span><span class="p">(</span><span class="s1">&#39;could not complete request&#39;</span><span class="p">)</span>
<div class="viewcode-block" id="ExponentialBackOffRepeater">
<a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.ExponentialBackOffRepeater">[docs]</a>
<span class="k">class</span><span class="w"> </span><span class="nc">ExponentialBackOffRepeater</span><span class="p">(</span><span class="n">Repeater</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Configure exponential backoff retry strategy.</span>
<span class="sd"> It retries for exceptions due to the remote service such as</span>
<span class="sd"> TooManyRequests (HTTP 429), UserCodeTimeoutException, UserCodeQuotaException.</span>
<span class="sd"> It utilizes the decorator</span>
<span class="sd"> :func:`apache_beam.utils.retry.with_exponential_backoff`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">pass</span>
<div class="viewcode-block" id="ExponentialBackOffRepeater.repeat">
<a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.ExponentialBackOffRepeater.repeat">[docs]</a>
<span class="nd">@retry</span><span class="o">.</span><span class="n">with_exponential_backoff</span><span class="p">(</span>
<span class="n">num_retries</span><span class="o">=</span><span class="mi">2</span><span class="p">,</span> <span class="n">retry_filter</span><span class="o">=</span><span class="n">retry_on_exception</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="nf">repeat</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span>
<span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">,</span>
<span class="n">timeout</span><span class="p">:</span> <span class="nb">float</span><span class="p">,</span>
<span class="n">metrics_collector</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">_MetricsCollector</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">ResponseT</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;repeat method is called from the RequestResponseIO when</span>
<span class="sd"> a repeater is enabled.</span>
<span class="sd"> Args:</span>
<span class="sd"> caller: a `~apache_beam.io.requestresponse.Caller` object that</span>
<span class="sd"> calls the API.</span>
<span class="sd"> request: input request to repeat.</span>
<span class="sd"> timeout: time to wait for the request to complete.</span>
<span class="sd"> metrics_collector: (Optional) a</span>
<span class="sd"> `~apache_beam.io.requestresponse._MetricsCollector` object to</span>
<span class="sd"> collect the metrics for RequestResponseIO.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">_execute_request</span><span class="p">(</span><span class="n">caller</span><span class="p">,</span> <span class="n">request</span><span class="p">,</span> <span class="n">timeout</span><span class="p">,</span> <span class="n">metrics_collector</span><span class="p">)</span></div>
</div>
<div class="viewcode-block" id="NoOpsRepeater">
<a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.NoOpsRepeater">[docs]</a>
<span class="k">class</span><span class="w"> </span><span class="nc">NoOpsRepeater</span><span class="p">(</span><span class="n">Repeater</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Executes a request just once irrespective of any exception.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="NoOpsRepeater.repeat">
<a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.NoOpsRepeater.repeat">[docs]</a>
<span class="k">def</span><span class="w"> </span><span class="nf">repeat</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span>
<span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">,</span>
<span class="n">timeout</span><span class="p">:</span> <span class="nb">float</span><span class="p">,</span>
<span class="n">metrics_collector</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">_MetricsCollector</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">ResponseT</span><span class="p">:</span>
<span class="k">return</span> <span class="n">_execute_request</span><span class="p">(</span><span class="n">caller</span><span class="p">,</span> <span class="n">request</span><span class="p">,</span> <span class="n">timeout</span><span class="p">,</span> <span class="n">metrics_collector</span><span class="p">)</span></div>
</div>
<span class="k">class</span><span class="w"> </span><span class="nc">PreCallThrottler</span><span class="p">(</span><span class="n">abc</span><span class="o">.</span><span class="n">ABC</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Provides a throttle mechanism before sending request.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<div class="viewcode-block" id="DefaultThrottler">
<a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.DefaultThrottler">[docs]</a>
<span class="k">class</span><span class="w"> </span><span class="nc">DefaultThrottler</span><span class="p">(</span><span class="n">PreCallThrottler</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Default throttler that uses</span>
<span class="sd"> :class:`apache_beam.io.components.adaptive_throttler.AdaptiveThrottler`</span>
<span class="sd"> Args:</span>
<span class="sd"> window_ms (int): length of history to consider, in ms, to set throttling.</span>
<span class="sd"> bucket_ms (int): granularity of time buckets that we store data in, in ms.</span>
<span class="sd"> overload_ratio (float): the target ratio between requests sent and</span>
<span class="sd"> successful requests. This is &quot;K&quot; in the formula in</span>
<span class="sd"> https://landing.google.com/sre/book/chapters/handling-overload.html.</span>
<span class="sd"> delay_secs (int): minimum number of seconds to throttle a request.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">window_ms</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">1</span><span class="p">,</span>
<span class="n">bucket_ms</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">1</span><span class="p">,</span>
<span class="n">overload_ratio</span><span class="p">:</span> <span class="nb">float</span> <span class="o">=</span> <span class="mi">2</span><span class="p">,</span>
<span class="n">delay_secs</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">5</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">throttler</span> <span class="o">=</span> <span class="n">AdaptiveThrottler</span><span class="p">(</span>
<span class="n">window_ms</span><span class="o">=</span><span class="n">window_ms</span><span class="p">,</span> <span class="n">bucket_ms</span><span class="o">=</span><span class="n">bucket_ms</span><span class="p">,</span> <span class="n">overload_ratio</span><span class="o">=</span><span class="n">overload_ratio</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">delay_secs</span> <span class="o">=</span> <span class="n">delay_secs</span></div>
<span class="k">class</span><span class="w"> </span><span class="nc">_FilterCacheReadFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A `DoFn` that partitions cache reads.</span>
<span class="sd"> It emits to main output for successful cache read requests or</span>
<span class="sd"> to the tagged output - `cache_misses` - otherwise.&quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">:</span> <span class="n">Tuple</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]:</span>
<span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span><span class="s1">&#39;cache_misses&#39;</span><span class="p">,</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">element</span>
<span class="k">class</span><span class="w"> </span><span class="nc">_Call</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">RequestT</span><span class="p">],</span>
<span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]]):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;(Internal-only) PTransform that invokes a remote function on each element</span>
<span class="sd"> of the input PCollection.</span>
<span class="sd"> This PTransform uses a `Caller` object to invoke the actual API calls,</span>
<span class="sd"> and uses ``__enter__`` and ``__exit__`` to manage setup and teardown of</span>
<span class="sd"> clients when applicable. Additionally, a timeout value is specified to</span>
<span class="sd"> regulate the duration of each call, defaults to 30 seconds.</span>
<span class="sd"> Args:</span>
<span class="sd"> caller: a `Caller` object that invokes API call.</span>
<span class="sd"> timeout (float): timeout value in seconds to wait for response from API.</span>
<span class="sd"> should_backoff: (Optional) provides methods for backoff.</span>
<span class="sd"> repeater: (Optional) provides methods to repeat requests to API.</span>
<span class="sd"> throttler: (Optional) provides methods to pre-throttle a request.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span>
<span class="n">timeout</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">float</span><span class="p">]</span> <span class="o">=</span> <span class="n">DEFAULT_TIMEOUT_SECS</span><span class="p">,</span>
<span class="n">should_backoff</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">ShouldBackOff</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">repeater</span><span class="p">:</span> <span class="n">Repeater</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">throttler</span><span class="p">:</span> <span class="n">PreCallThrottler</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_caller</span> <span class="o">=</span> <span class="n">caller</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span> <span class="o">=</span> <span class="n">timeout</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_should_backoff</span> <span class="o">=</span> <span class="n">should_backoff</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span> <span class="o">=</span> <span class="n">repeater</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span> <span class="o">=</span> <span class="n">throttler</span>
<span class="k">def</span><span class="w"> </span><span class="nf">expand</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">requests</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">RequestT</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]:</span>
<span class="k">return</span> <span class="n">requests</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_CallDoFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="p">))</span>
<span class="k">class</span><span class="w"> </span><span class="nc">_CallDoFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span><span class="w"> </span><span class="nf">setup</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="o">.</span><span class="fm">__enter__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span> <span class="o">=</span> <span class="n">_MetricsCollector</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="o">.</span><span class="fm">__str__</span><span class="p">())</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="o">.</span><span class="n">setup_counter</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span>
<span class="n">timeout</span><span class="p">:</span> <span class="nb">float</span><span class="p">,</span>
<span class="n">repeater</span><span class="p">:</span> <span class="n">Repeater</span><span class="p">,</span>
<span class="n">throttler</span><span class="p">:</span> <span class="n">PreCallThrottler</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_caller</span> <span class="o">=</span> <span class="n">caller</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span> <span class="o">=</span> <span class="n">timeout</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span> <span class="o">=</span> <span class="n">repeater</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span> <span class="o">=</span> <span class="n">throttler</span>
<span class="k">def</span><span class="w"> </span><span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="o">.</span><span class="n">requests</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span>
<span class="n">is_throttled_request</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="p">:</span>
<span class="k">while</span> <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="o">.</span><span class="n">throttler</span><span class="o">.</span><span class="n">throttle_request</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">*</span>
<span class="n">MSEC_TO_SEC</span><span class="p">):</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s2">&quot;Delaying request for </span><span class="si">%d</span><span class="s2"> seconds&quot;</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="o">.</span><span class="n">delay_secs</span><span class="p">)</span>
<span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="o">.</span><span class="n">delay_secs</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="o">.</span><span class="n">throttled_secs</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="o">.</span><span class="n">delay_secs</span><span class="p">)</span>
<span class="n">is_throttled_request</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">if</span> <span class="n">is_throttled_request</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="o">.</span><span class="n">throttled_requests</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">req_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span>
<span class="n">response</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span><span class="o">.</span><span class="n">repeat</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="p">,</span> <span class="n">request</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="o">.</span><span class="n">responses</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="o">.</span><span class="n">throttler</span><span class="o">.</span><span class="n">successful_request</span><span class="p">(</span><span class="n">req_time</span> <span class="o">*</span> <span class="n">MSEC_TO_SEC</span><span class="p">)</span>
<span class="k">yield</span> <span class="n">response</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">e</span>
<span class="k">def</span><span class="w"> </span><span class="nf">teardown</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="o">.</span><span class="n">teardown_counter</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="o">.</span><span class="fm">__exit__</span><span class="p">(</span><span class="o">*</span><span class="n">sys</span><span class="o">.</span><span class="n">exc_info</span><span class="p">())</span>
<span class="k">class</span><span class="w"> </span><span class="nc">Cache</span><span class="p">(</span><span class="n">abc</span><span class="o">.</span><span class="n">ABC</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Base Cache class for</span>
<span class="sd"> :class:`apache_beam.io.requestresponse.RequestResponseIO`.</span>
<span class="sd"> For adding cache support to RequestResponseIO, implement this class.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span>
<span class="k">def</span><span class="w"> </span><span class="nf">get_read</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;returns a PTransform that reads from the cache.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span>
<span class="k">def</span><span class="w"> </span><span class="nf">get_write</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;returns a PTransform that writes to the cache.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="nd">@property</span>
<span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span>
<span class="k">def</span><span class="w"> </span><span class="nf">request_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;request coder to use with Cache.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="nd">@request_coder</span><span class="o">.</span><span class="n">setter</span>
<span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span>
<span class="k">def</span><span class="w"> </span><span class="nf">request_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">request_coder</span><span class="p">:</span> <span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;sets the request coder to use with Cache.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="nd">@property</span>
<span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span>
<span class="k">def</span><span class="w"> </span><span class="nf">source_caller</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Actual caller that is using the cache.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="nd">@source_caller</span><span class="o">.</span><span class="n">setter</span>
<span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span>
<span class="k">def</span><span class="w"> </span><span class="nf">source_caller</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Sets the source caller for</span>
<span class="sd"> :class:`apache_beam.io.requestresponse.RequestResponseIO` to pull</span>
<span class="sd"> cache request key from respective callers.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="k">class</span><span class="w"> </span><span class="nc">_RedisMode</span><span class="p">(</span><span class="n">enum</span><span class="o">.</span><span class="n">Enum</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Mode of operation for redis cache when using</span>
<span class="sd"> `~apache_beam.io.requestresponse._RedisCaller`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">READ</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">WRITE</span> <span class="o">=</span> <span class="mi">1</span>
<span class="k">class</span><span class="w"> </span><span class="nc">_RedisCaller</span><span class="p">(</span><span class="n">Caller</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;An implementation of</span>
<span class="sd"> `~apache_beam.io.requestresponse.Caller` for Redis client.</span>
<span class="sd"> It provides the functionality for making requests to Redis server using</span>
<span class="sd"> :class:`apache_beam.io.requestresponse.RequestResponseIO`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">host</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">port</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">time_to_live</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="n">timedelta</span><span class="p">],</span>
<span class="o">*</span><span class="p">,</span>
<span class="n">request_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">],</span>
<span class="n">response_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">],</span>
<span class="n">kwargs</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">source_caller</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Caller</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">mode</span><span class="p">:</span> <span class="n">_RedisMode</span><span class="p">,</span>
<span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Args:</span>
<span class="sd"> host (str): The hostname or IP address of the Redis server.</span>
<span class="sd"> port (int): The port number of the Redis server.</span>
<span class="sd"> time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for</span>
<span class="sd"> records stored in Redis. Provide an integer (in seconds) or a</span>
<span class="sd"> `datetime.timedelta` object.</span>
<span class="sd"> request_coder: (Optional[`coders.Coder`]) coder for requests stored</span>
<span class="sd"> in Redis.</span>
<span class="sd"> response_coder: (Optional[`coders.Coder`]) coder for decoding responses</span>
<span class="sd"> received from Redis.</span>
<span class="sd"> kwargs: Optional(Dict[str, Any]) additional keyword arguments that</span>
<span class="sd"> are required to connect to your redis server. Same as `redis.Redis()`.</span>
<span class="sd"> source_caller: (Optional[`Caller`]): The source caller using this Redis</span>
<span class="sd"> cache in case of fetching the cache request to store in Redis.</span>
<span class="sd"> mode: `_RedisMode` An enum type specifying the operational mode of</span>
<span class="sd"> the `_RedisCaller`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">host</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">port</span> <span class="o">=</span> <span class="n">host</span><span class="p">,</span> <span class="n">port</span>
<span class="bp">self</span><span class="o">.</span><span class="n">time_to_live</span> <span class="o">=</span> <span class="n">time_to_live</span>
<span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span> <span class="o">=</span> <span class="n">request_coder</span>
<span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span> <span class="o">=</span> <span class="n">response_coder</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">source_caller</span> <span class="o">=</span> <span class="n">source_caller</span>
<span class="bp">self</span><span class="o">.</span><span class="n">mode</span> <span class="o">=</span> <span class="n">mode</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">redis</span><span class="o">.</span><span class="n">Redis</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">host</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">port</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="nf">_read_cache</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">cache_request</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">source_caller</span><span class="o">.</span><span class="n">get_cache_key</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="c1"># check if the caller is a enrichment handler. EnrichmentHandler</span>
<span class="c1"># provides the request format for cache.</span>
<span class="k">if</span> <span class="n">cache_request</span><span class="p">:</span>
<span class="n">encoded_request</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">cache_request</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">encoded_request</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="n">encoded_response</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">encoded_request</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">encoded_response</span><span class="p">:</span>
<span class="c1"># no cache entry present for this request.</span>
<span class="k">return</span> <span class="n">element</span><span class="p">,</span> <span class="kc">None</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">response_dict</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">encoded_response</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">))</span>
<span class="n">response</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">Row</span><span class="p">(</span><span class="o">**</span><span class="n">response_dict</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;cannot decode response from redis cache for </span><span class="si">%s</span><span class="s1">.&#39;</span> <span class="o">%</span> <span class="n">element</span><span class="p">)</span>
<span class="k">return</span> <span class="n">element</span><span class="p">,</span> <span class="kc">None</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">response</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="n">encoded_response</span><span class="p">)</span>
<span class="k">return</span> <span class="n">element</span><span class="p">,</span> <span class="n">response</span>
<span class="k">def</span><span class="w"> </span><span class="nf">_write_cache</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">cache_request</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">source_caller</span><span class="o">.</span><span class="n">get_cache_key</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="k">if</span> <span class="n">cache_request</span><span class="p">:</span>
<span class="n">encoded_request</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">cache_request</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">encoded_request</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">encoded_response</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">_asdict</span><span class="p">())</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;cannot encode response </span><span class="si">%s</span><span class="s1"> for </span><span class="si">%s</span><span class="s1"> to store in &#39;</span>
<span class="s1">&#39;redis cache.&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">],</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]))</span>
<span class="k">return</span> <span class="n">element</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">encoded_response</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span>
<span class="c1"># Write to cache with TTL. Set nx to True to prevent overwriting for the</span>
<span class="c1"># same key.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">set</span><span class="p">(</span>
<span class="n">encoded_request</span><span class="p">,</span> <span class="n">encoded_response</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">time_to_live</span><span class="p">,</span> <span class="n">nx</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="k">return</span> <span class="n">element</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">mode</span> <span class="o">==</span> <span class="n">_RedisMode</span><span class="o">.</span><span class="n">READ</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">List</span><span class="p">):</span>
<span class="n">responses</span> <span class="o">=</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_cache</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">element</span><span class="p">]</span>
<span class="k">return</span> <span class="n">responses</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_cache</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">List</span><span class="p">):</span>
<span class="n">responses</span> <span class="o">=</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">_write_cache</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">element</span><span class="p">]</span>
<span class="k">return</span> <span class="n">responses</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_write_cache</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exc_type</span><span class="p">,</span> <span class="n">exc_val</span><span class="p">,</span> <span class="n">exc_tb</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="k">class</span><span class="w"> </span><span class="nc">_ReadFromRedis</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">RequestT</span><span class="p">],</span>
<span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]]):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A `PTransform` that performs Redis cache read.&quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">host</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">port</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">time_to_live</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="n">timedelta</span><span class="p">],</span>
<span class="o">*</span><span class="p">,</span>
<span class="n">kwargs</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">request_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">],</span>
<span class="n">response_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">],</span>
<span class="n">source_caller</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Args:</span>
<span class="sd"> host (str): The hostname or IP address of the Redis server.</span>
<span class="sd"> port (int): The port number of the Redis server.</span>
<span class="sd"> time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for</span>
<span class="sd"> records stored in Redis. Provide an integer (in seconds) or a</span>
<span class="sd"> `datetime.timedelta` object.</span>
<span class="sd"> kwargs: Optional(Dict[str, Any]) additional keyword arguments that</span>
<span class="sd"> are required to connect to your redis server. Same as `redis.Redis()`.</span>
<span class="sd"> request_coder: (Optional[`coders.Coder`]) coder for requests stored</span>
<span class="sd"> in Redis.</span>
<span class="sd"> response_coder: (Optional[`coders.Coder`]) coder for decoding responses</span>
<span class="sd"> received from Redis.</span>
<span class="sd"> source_caller: (Optional[`Caller`]): The source caller using this Redis</span>
<span class="sd"> cache in case of fetching the cache request to store in Redis.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span> <span class="o">=</span> <span class="n">request_coder</span>
<span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span> <span class="o">=</span> <span class="n">response_coder</span>
<span class="bp">self</span><span class="o">.</span><span class="n">redis_caller</span> <span class="o">=</span> <span class="n">_RedisCaller</span><span class="p">(</span>
<span class="n">host</span><span class="p">,</span>
<span class="n">port</span><span class="p">,</span>
<span class="n">time_to_live</span><span class="p">,</span>
<span class="n">request_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span><span class="p">,</span>
<span class="n">response_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span><span class="p">,</span>
<span class="n">kwargs</span><span class="o">=</span><span class="n">kwargs</span><span class="p">,</span>
<span class="n">source_caller</span><span class="o">=</span><span class="n">source_caller</span><span class="p">,</span>
<span class="n">mode</span><span class="o">=</span><span class="n">_RedisMode</span><span class="o">.</span><span class="n">READ</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="nf">expand</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">requests</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">RequestT</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]:</span>
<span class="k">return</span> <span class="n">requests</span> <span class="o">|</span> <span class="n">RequestResponseIO</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">redis_caller</span><span class="p">)</span>
<span class="k">class</span><span class="w"> </span><span class="nc">_WriteToRedis</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span>
<span class="n">ResponseT</span><span class="p">]],</span>
<span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]]):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A `PTransfrom` that performs write to Redis cache.&quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">host</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">port</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">time_to_live</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="n">timedelta</span><span class="p">],</span>
<span class="o">*</span><span class="p">,</span>
<span class="n">kwargs</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">request_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">],</span>
<span class="n">response_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">],</span>
<span class="n">source_caller</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Args:</span>
<span class="sd"> host (str): The hostname or IP address of the Redis server.</span>
<span class="sd"> port (int): The port number of the Redis server.</span>
<span class="sd"> time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for</span>
<span class="sd"> records stored in Redis. Provide an integer (in seconds) or a</span>
<span class="sd"> `datetime.timedelta` object.</span>
<span class="sd"> kwargs: Optional(Dict[str, Any]) additional keyword arguments that</span>
<span class="sd"> are required to connect to your redis server. Same as `redis.Redis()`.</span>
<span class="sd"> request_coder: (Optional[`coders.Coder`]) coder for requests stored</span>
<span class="sd"> in Redis.</span>
<span class="sd"> response_coder: (Optional[`coders.Coder`]) coder for decoding responses</span>
<span class="sd"> received from Redis.</span>
<span class="sd"> source_caller: (Optional[`Caller`]): The source caller using this Redis</span>
<span class="sd"> cache in case of fetching the cache request to store in Redis.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span> <span class="o">=</span> <span class="n">request_coder</span>
<span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span> <span class="o">=</span> <span class="n">response_coder</span>
<span class="bp">self</span><span class="o">.</span><span class="n">redis_caller</span> <span class="o">=</span> <span class="n">_RedisCaller</span><span class="p">(</span>
<span class="n">host</span><span class="p">,</span>
<span class="n">port</span><span class="p">,</span>
<span class="n">time_to_live</span><span class="p">,</span>
<span class="n">request_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span><span class="p">,</span>
<span class="n">response_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span><span class="p">,</span>
<span class="n">kwargs</span><span class="o">=</span><span class="n">kwargs</span><span class="p">,</span>
<span class="n">source_caller</span><span class="o">=</span><span class="n">source_caller</span><span class="p">,</span>
<span class="n">mode</span><span class="o">=</span><span class="n">_RedisMode</span><span class="o">.</span><span class="n">WRITE</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="nf">expand</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">elements</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">]]</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]:</span>
<span class="k">return</span> <span class="n">elements</span> <span class="o">|</span> <span class="n">RequestResponseIO</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">redis_caller</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="nf">ensure_coders_exist</span><span class="p">(</span><span class="n">request_coder</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;checks if the coder exists to encode the request for caching.&quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">request_coder</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;need request coder to be able to use &#39;</span>
<span class="s1">&#39;Cache with RequestResponseIO.&#39;</span><span class="p">)</span>
<div class="viewcode-block" id="RedisCache">
<a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.RedisCache">[docs]</a>
<span class="k">class</span><span class="w"> </span><span class="nc">RedisCache</span><span class="p">(</span><span class="n">Cache</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Configure cache using Redis for</span>
<span class="sd"> :class:`apache_beam.io.requestresponse.RequestResponseIO`.&quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">host</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">port</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">time_to_live</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="n">timedelta</span><span class="p">]</span> <span class="o">=</span> <span class="n">DEFAULT_CACHE_ENTRY_TTL_SEC</span><span class="p">,</span>
<span class="o">*</span><span class="p">,</span>
<span class="n">request_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">response_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="o">**</span><span class="n">kwargs</span><span class="p">,</span>
<span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Args:</span>
<span class="sd"> host (str): The hostname or IP address of the Redis server.</span>
<span class="sd"> port (int): The port number of the Redis server.</span>
<span class="sd"> time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for</span>
<span class="sd"> records stored in Redis. Provide an integer (in seconds) or a</span>
<span class="sd"> `datetime.timedelta` object.</span>
<span class="sd"> request_coder: (Optional[`coders.Coder`]) coder for encoding requests.</span>
<span class="sd"> response_coder: (Optional[`coders.Coder`]) coder for decoding responses</span>
<span class="sd"> received from Redis.</span>
<span class="sd"> kwargs: Optional additional keyword arguments that</span>
<span class="sd"> are required to connect to your redis server. Same as `redis.Redis()`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_host</span> <span class="o">=</span> <span class="n">host</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_port</span> <span class="o">=</span> <span class="n">port</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_time_to_live</span> <span class="o">=</span> <span class="n">time_to_live</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span> <span class="o">=</span> <span class="n">request_coder</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_response_coder</span> <span class="o">=</span> <span class="n">response_coder</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span> <span class="o">=</span> <span class="n">kwargs</span> <span class="k">if</span> <span class="n">kwargs</span> <span class="k">else</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source_caller</span> <span class="o">=</span> <span class="kc">None</span>
<div class="viewcode-block" id="RedisCache.get_read">
<a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.RedisCache.get_read">[docs]</a>
<span class="k">def</span><span class="w"> </span><span class="nf">get_read</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;get_read returns a PTransform for reading from the cache.&quot;&quot;&quot;</span>
<span class="n">ensure_coders_exist</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_ReadFromRedis</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_host</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_port</span><span class="p">,</span>
<span class="n">time_to_live</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_time_to_live</span><span class="p">,</span>
<span class="n">kwargs</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="p">,</span>
<span class="n">request_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span><span class="p">,</span>
<span class="n">response_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_response_coder</span><span class="p">,</span>
<span class="n">source_caller</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_source_caller</span><span class="p">)</span></div>
<div class="viewcode-block" id="RedisCache.get_write">
<a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.RedisCache.get_write">[docs]</a>
<span class="k">def</span><span class="w"> </span><span class="nf">get_write</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;returns a PTransform for writing to the cache.&quot;&quot;&quot;</span>
<span class="n">ensure_coders_exist</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_WriteToRedis</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_host</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_port</span><span class="p">,</span>
<span class="n">time_to_live</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_time_to_live</span><span class="p">,</span>
<span class="n">kwargs</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="p">,</span>
<span class="n">request_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span><span class="p">,</span>
<span class="n">response_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_response_coder</span><span class="p">,</span>
<span class="n">source_caller</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_source_caller</span><span class="p">)</span></div>
<span class="nd">@property</span>
<span class="k">def</span><span class="w"> </span><span class="nf">source_caller</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_caller</span>
<span class="nd">@source_caller</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span><span class="w"> </span><span class="nf">source_caller</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source_caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source_caller</span> <span class="o">=</span> <span class="n">source_caller</span>
<span class="nd">@property</span>
<span class="k">def</span><span class="w"> </span><span class="nf">request_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span>
<span class="nd">@request_coder</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span><span class="w"> </span><span class="nf">request_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">request_coder</span><span class="p">:</span> <span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span> <span class="o">=</span> <span class="n">request_coder</span></div>
<span class="k">class</span><span class="w"> </span><span class="nc">FlattenBatch</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Flatten a batched PCollection.&quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">elements</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">for</span> <span class="n">element</span> <span class="ow">in</span> <span class="n">elements</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">element</span>
<div class="viewcode-block" id="RequestResponseIO">
<a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.RequestResponseIO">[docs]</a>
<span class="k">class</span><span class="w"> </span><span class="nc">RequestResponseIO</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">RequestT</span><span class="p">],</span>
<span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]]):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A :class:`RequestResponseIO` transform to read and write to APIs.</span>
<span class="sd"> Processes an input :class:`~apache_beam.pvalue.PCollection` of requests</span>
<span class="sd"> by making a call to the API as defined in `Caller`&#39;s `__call__` method</span>
<span class="sd"> and returns a :class:`~apache_beam.pvalue.PCollection` of responses.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span>
<span class="n">timeout</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">float</span><span class="p">]</span> <span class="o">=</span> <span class="n">DEFAULT_TIMEOUT_SECS</span><span class="p">,</span>
<span class="n">should_backoff</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">ShouldBackOff</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">repeater</span><span class="p">:</span> <span class="n">Repeater</span> <span class="o">=</span> <span class="n">ExponentialBackOffRepeater</span><span class="p">(),</span>
<span class="n">cache</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Cache</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">throttler</span><span class="p">:</span> <span class="n">PreCallThrottler</span> <span class="o">=</span> <span class="n">DefaultThrottler</span><span class="p">(),</span>
<span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Instantiates a RequestResponseIO transform.</span>
<span class="sd"> Args:</span>
<span class="sd"> caller: an implementation of</span>
<span class="sd"> `Caller` object that makes call to the API.</span>
<span class="sd"> timeout (float): timeout value in seconds to wait for response from API.</span>
<span class="sd"> should_backoff: (Optional) provides methods for backoff.</span>
<span class="sd"> repeater: provides method to repeat failed requests to API due to service</span>
<span class="sd"> errors. Defaults to</span>
<span class="sd"> :class:`apache_beam.io.requestresponse.ExponentialBackOffRepeater` to</span>
<span class="sd"> repeat requests with exponential backoff.</span>
<span class="sd"> cache: (Optional) a `~apache_beam.io.requestresponse.Cache` object</span>
<span class="sd"> to use the appropriate cache.</span>
<span class="sd"> throttler: provides methods to pre-throttle a request. Defaults to</span>
<span class="sd"> :class:`apache_beam.io.requestresponse.DefaultThrottler` for</span>
<span class="sd"> client-side adaptive throttling using</span>
<span class="sd"> :class:`apache_beam.io.components.adaptive_throttler.AdaptiveThrottler`</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_caller</span> <span class="o">=</span> <span class="n">caller</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span> <span class="o">=</span> <span class="n">timeout</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_should_backoff</span> <span class="o">=</span> <span class="n">should_backoff</span>
<span class="k">if</span> <span class="n">repeater</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span> <span class="o">=</span> <span class="n">repeater</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span> <span class="o">=</span> <span class="n">NoOpsRepeater</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_cache</span> <span class="o">=</span> <span class="n">cache</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span> <span class="o">=</span> <span class="n">throttler</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_batching_kwargs</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="o">.</span><span class="n">batch_elements_kwargs</span><span class="p">()</span>
<div class="viewcode-block" id="RequestResponseIO.expand">
<a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.RequestResponseIO.expand">[docs]</a>
<span class="k">def</span><span class="w"> </span><span class="nf">expand</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">requests</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">RequestT</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]:</span>
<span class="c1"># TODO(riteshghorse): handle Throttle PTransforms when available.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cache</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_cache</span><span class="o">.</span><span class="n">source_caller</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_caller</span>
<span class="n">inputs</span> <span class="o">=</span> <span class="n">requests</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cache</span><span class="p">:</span>
<span class="c1"># read from cache.</span>
<span class="n">outputs</span> <span class="o">=</span> <span class="n">inputs</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cache</span><span class="o">.</span><span class="n">get_read</span><span class="p">()</span>
<span class="c1"># filter responses that are None and send them to the Call transform</span>
<span class="c1"># to fetch a value from external service.</span>
<span class="n">cached_responses</span><span class="p">,</span> <span class="n">inputs</span> <span class="o">=</span> <span class="p">(</span><span class="n">outputs</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">_FilterCacheReadFn</span><span class="p">()</span>
<span class="p">)</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span>
<span class="s1">&#39;cache_misses&#39;</span><span class="p">,</span> <span class="n">main</span><span class="o">=</span><span class="s1">&#39;cached_responses&#39;</span><span class="p">))</span>
<span class="c1"># Batch elements if batching is enabled.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batching_kwargs</span><span class="p">:</span>
<span class="n">inputs</span> <span class="o">=</span> <span class="n">inputs</span> <span class="o">|</span> <span class="n">BatchElements</span><span class="p">(</span><span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">_batching_kwargs</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="p">,</span> <span class="n">DefaultThrottler</span><span class="p">):</span>
<span class="c1"># DefaultThrottler applies throttling in the DoFn of</span>
<span class="c1"># Call PTransform.</span>
<span class="n">responses</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">inputs</span>
<span class="o">|</span> <span class="n">_Call</span><span class="p">(</span>
<span class="n">caller</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="p">,</span>
<span class="n">timeout</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span><span class="p">,</span>
<span class="n">should_backoff</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_should_backoff</span><span class="p">,</span>
<span class="n">repeater</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span><span class="p">,</span>
<span class="n">throttler</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># No throttling mechanism. The requests are made to the external source</span>
<span class="c1"># as they come.</span>
<span class="n">responses</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">inputs</span>
<span class="o">|</span> <span class="n">_Call</span><span class="p">(</span>
<span class="n">caller</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="p">,</span>
<span class="n">timeout</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span><span class="p">,</span>
<span class="n">should_backoff</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_should_backoff</span><span class="p">,</span>
<span class="n">repeater</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span><span class="p">))</span>
<span class="c1"># if batching is enabled then handle accordingly.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batching_kwargs</span><span class="p">:</span>
<span class="n">responses</span> <span class="o">=</span> <span class="n">responses</span> <span class="o">|</span> <span class="s2">&quot;FlattenBatch&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">FlattenBatch</span><span class="p">())</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cache</span><span class="p">:</span>
<span class="c1"># write to cache.</span>
<span class="n">_</span> <span class="o">=</span> <span class="n">responses</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cache</span><span class="o">.</span><span class="n">get_write</span><span class="p">()</span>
<span class="k">return</span> <span class="p">(</span><span class="n">cached_responses</span><span class="p">,</span> <span class="n">responses</span><span class="p">)</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">()</span>
<span class="k">return</span> <span class="n">responses</span></div>
</div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>&#169; Copyright %Y, Apache Beam.</p>
</div>
Built with <a href="https://www.sphinx-doc.org/">Sphinx</a> using a
<a href="https://github.com/readthedocs/sphinx_rtd_theme">theme</a>
provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script>
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>