| |
| |
| <!DOCTYPE html> |
| <html class="writer-html5" lang="en" data-content_root="../../../"> |
| <head> |
| <meta charset="utf-8" /> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0" /> |
| <title>apache_beam.io.requestresponse — Apache Beam 2.67.0 documentation</title> |
| <link rel="stylesheet" type="text/css" href="../../../_static/pygments.css?v=b86133f3" /> |
| <link rel="stylesheet" type="text/css" href="../../../_static/css/theme.css?v=e59714d7" /> |
| |
| |
| <script src="../../../_static/jquery.js?v=5d32c60e"></script> |
| <script src="../../../_static/_sphinx_javascript_frameworks_compat.js?v=2cd50e6c"></script> |
| <script src="../../../_static/documentation_options.js?v=959b4fbe"></script> |
| <script src="../../../_static/doctools.js?v=9a2dae69"></script> |
| <script src="../../../_static/sphinx_highlight.js?v=dc90522c"></script> |
| <script src="../../../_static/js/theme.js"></script> |
| <link rel="index" title="Index" href="../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../search.html" /> |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| <div class="wy-grid-for-nav"> |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../index.html" class="icon icon-home"> |
| Apache Beam |
| </a> |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" aria-label="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| </div><div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="Navigation menu"> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| </ul> |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"><nav class="wy-nav-top" aria-label="Mobile navigation menu" > |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../index.html">Apache Beam</a> |
| </nav> |
| |
| <div class="wy-nav-content"> |
| <div class="rst-content"> |
| <div role="navigation" aria-label="Page navigation"> |
| <ul class="wy-breadcrumbs"> |
| <li><a href="../../../index.html" class="icon icon-home" aria-label="Home"></a></li> |
| <li class="breadcrumb-item"><a href="../../index.html">Module code</a></li> |
| <li class="breadcrumb-item active">apache_beam.io.requestresponse</li> |
| <li class="wy-breadcrumbs-aside"> |
| </li> |
| </ul> |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.io.requestresponse</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="sd">"""``PTransform`` for reading from and writing to Web APIs."""</span> |
| <span class="kn">import</span><span class="w"> </span><span class="nn">abc</span> |
| <span class="kn">import</span><span class="w"> </span><span class="nn">concurrent.futures</span> |
| <span class="kn">import</span><span class="w"> </span><span class="nn">contextlib</span> |
| <span class="kn">import</span><span class="w"> </span><span class="nn">enum</span> |
| <span class="kn">import</span><span class="w"> </span><span class="nn">json</span> |
| <span class="kn">import</span><span class="w"> </span><span class="nn">logging</span> |
| <span class="kn">import</span><span class="w"> </span><span class="nn">sys</span> |
| <span class="kn">import</span><span class="w"> </span><span class="nn">time</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">datetime</span><span class="w"> </span><span class="kn">import</span> <span class="n">timedelta</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Any</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Dict</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Generic</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">List</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Mapping</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Optional</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Tuple</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">TypeVar</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">Union</span> |
| |
| <span class="kn">import</span><span class="w"> </span><span class="nn">redis</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">google.api_core.exceptions</span><span class="w"> </span><span class="kn">import</span> <span class="n">TooManyRequests</span> |
| |
| <span class="kn">import</span><span class="w"> </span><span class="nn">apache_beam</span><span class="w"> </span><span class="k">as</span><span class="w"> </span><span class="nn">beam</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam</span><span class="w"> </span><span class="kn">import</span> <span class="n">pvalue</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam.coders</span><span class="w"> </span><span class="kn">import</span> <span class="n">coders</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam.io.components.adaptive_throttler</span><span class="w"> </span><span class="kn">import</span> <span class="n">AdaptiveThrottler</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam.metrics</span><span class="w"> </span><span class="kn">import</span> <span class="n">Metrics</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam.transforms.util</span><span class="w"> </span><span class="kn">import</span> <span class="n">BatchElements</span> |
| <span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam.utils</span><span class="w"> </span><span class="kn">import</span> <span class="n">retry</span> |
| |
| <span class="n">RequestT</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s1">'RequestT'</span><span class="p">)</span> |
| <span class="n">ResponseT</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s1">'ResponseT'</span><span class="p">)</span> |
| |
| <span class="c1"># DEFAULT_TIMEOUT_SECS represents the time interval for completing the request</span> |
| <span class="c1"># with external source.</span> |
| <span class="n">DEFAULT_TIMEOUT_SECS</span> <span class="o">=</span> <span class="mi">30</span> |
| |
| <span class="c1"># DEFAULT_CACHE_ENTRY_TTL_SEC represents the total time-to-live</span> |
| <span class="c1"># for cache record.</span> |
| <span class="n">DEFAULT_CACHE_ENTRY_TTL_SEC</span> <span class="o">=</span> <span class="mi">24</span> <span class="o">*</span> <span class="mi">60</span> <span class="o">*</span> <span class="mi">60</span> |
| |
| <span class="n">MSEC_TO_SEC</span> <span class="o">=</span> <span class="mi">1000</span> |
| |
| <span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span> |
| |
| <span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="s1">'RequestResponseIO'</span><span class="p">,</span> |
| <span class="s1">'ExponentialBackOffRepeater'</span><span class="p">,</span> |
| <span class="s1">'DefaultThrottler'</span><span class="p">,</span> |
| <span class="s1">'NoOpsRepeater'</span><span class="p">,</span> |
| <span class="s1">'RedisCache'</span><span class="p">,</span> |
| <span class="p">]</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">UserCodeExecutionException</span><span class="p">(</span><span class="ne">Exception</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Base class for errors related to calling Web APIs."""</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">UserCodeQuotaException</span><span class="p">(</span><span class="n">UserCodeExecutionException</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Extends ``UserCodeExecutionException`` to signal specifically that</span> |
| <span class="sd"> the Web API client encountered a Quota or API overuse related error.</span> |
| <span class="sd"> """</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">UserCodeTimeoutException</span><span class="p">(</span><span class="n">UserCodeExecutionException</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Extends ``UserCodeExecutionException`` to signal a user code timeout."""</span> |
| |
| |
| <span class="k">def</span><span class="w"> </span><span class="nf">retry_on_exception</span><span class="p">(</span><span class="n">exception</span><span class="p">:</span> <span class="ne">Exception</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""retry on exceptions caused by unavailability of the remote server."""</span> |
| <span class="k">return</span> <span class="nb">isinstance</span><span class="p">(</span> |
| <span class="n">exception</span><span class="p">,</span> |
| <span class="p">(</span><span class="n">TooManyRequests</span><span class="p">,</span> <span class="n">UserCodeTimeoutException</span><span class="p">,</span> <span class="n">UserCodeQuotaException</span><span class="p">))</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">_MetricsCollector</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""A metrics collector that tracks RequestResponseIO related usage."""</span> |
| <span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">namespace</span><span class="p">:</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Args:</span> |
| <span class="sd"> namespace: Namespace for the metrics.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">requests</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">'requests'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">responses</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">'responses'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">failures</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">'failures'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">throttled_requests</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">'throttled_requests'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">throttled_secs</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span> |
| <span class="n">namespace</span><span class="p">,</span> <span class="s1">'cumulativeThrottlingSeconds'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">timeout_requests</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">'requests_timed_out'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">call_counter</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">'call_invocations'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">setup_counter</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">'setup_counter'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">teardown_counter</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">'teardown_counter'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">backoff_counter</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">'backoff_counter'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sleeper_counter</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="s1">'sleeper_counter'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">should_backoff_counter</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="n">counter</span><span class="p">(</span> |
| <span class="n">namespace</span><span class="p">,</span> <span class="s1">'should_backoff_counter'</span><span class="p">)</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">Caller</span><span class="p">(</span><span class="n">contextlib</span><span class="o">.</span><span class="n">AbstractContextManager</span><span class="p">,</span> |
| <span class="n">abc</span><span class="o">.</span><span class="n">ABC</span><span class="p">,</span> |
| <span class="n">Generic</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">]):</span> |
| <span class="w"> </span><span class="sd">"""Interface for user custom code intended for API calls.</span> |
| |
| <span class="sd"> For setup and teardown of clients when applicable, implement the</span> |
| <span class="sd"> ``__enter__`` and ``__exit__`` methods respectively."""</span> |
| <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span><span class="w"> </span><span class="fm">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> <span class="o">-></span> <span class="n">ResponseT</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""Calls a Web API with the ``RequestT`` and returns a</span> |
| <span class="sd"> ``ResponseT``. ``RequestResponseIO`` expects implementations of the</span> |
| <span class="sd"> ``__call__`` method to throw either a ``UserCodeExecutionException``,</span> |
| <span class="sd"> ``UserCodeQuotaException``, or ``UserCodeTimeoutException``.</span> |
| <span class="sd"> """</span> |
| <span class="k">pass</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="fm">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="fm">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exc_type</span><span class="p">,</span> <span class="n">exc_val</span><span class="p">,</span> <span class="n">exc_tb</span><span class="p">):</span> |
| <span class="k">return</span> <span class="kc">None</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="nf">get_cache_key</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">)</span> <span class="o">-></span> <span class="nb">str</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""Returns the request to be cached.</span> |
| |
| <span class="sd"> This is how the response will be looked up in the cache as well.</span> |
| <span class="sd"> By default, entire request is cached as the key for the cache.</span> |
| <span class="sd"> Implement this method to override the key for the cache.</span> |
| <span class="sd"> For example, in `BigTableEnrichmentHandler`, the row key for the element</span> |
| <span class="sd"> is returned here.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="s2">""</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="nf">batch_elements_kwargs</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">Mapping</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""Returns a kwargs suitable for `beam.BatchElements`."""</span> |
| <span class="k">return</span> <span class="p">{}</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">ShouldBackOff</span><span class="p">(</span><span class="n">abc</span><span class="o">.</span><span class="n">ABC</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Provides mechanism to apply adaptive throttling.</span> |
| <span class="sd"> """</span> |
| <span class="k">pass</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">Repeater</span><span class="p">(</span><span class="n">abc</span><span class="o">.</span><span class="n">ABC</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Provides mechanism to repeat requests for a</span> |
| <span class="sd"> configurable condition."""</span> |
| <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">repeat</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span> |
| <span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">,</span> |
| <span class="n">timeout</span><span class="p">:</span> <span class="nb">float</span><span class="p">,</span> |
| <span class="n">metrics_collector</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">_MetricsCollector</span><span class="p">])</span> <span class="o">-></span> <span class="n">ResponseT</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""Implements a repeater strategy for RequestResponseIO when a repeater</span> |
| <span class="sd"> is enabled.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> caller: a `~apache_beam.io.requestresponse.Caller` object that</span> |
| <span class="sd"> calls the API.</span> |
| <span class="sd"> request: input request to repeat.</span> |
| <span class="sd"> timeout: time to wait for the request to complete.</span> |
| <span class="sd"> metrics_collector: (Optional) a</span> |
| <span class="sd"> `~apache_beam.io.requestresponse._MetricsCollector` object</span> |
| <span class="sd"> to collect the metrics for RequestResponseIO.</span> |
| <span class="sd"> """</span> |
| <span class="k">pass</span> |
| |
| |
| <span class="k">def</span><span class="w"> </span><span class="nf">_execute_request</span><span class="p">(</span> |
| <span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span> |
| <span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">,</span> |
| <span class="n">timeout</span><span class="p">:</span> <span class="nb">float</span><span class="p">,</span> |
| <span class="n">metrics_collector</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">_MetricsCollector</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-></span> <span class="n">ResponseT</span><span class="p">:</span> |
| <span class="k">with</span> <span class="n">concurrent</span><span class="o">.</span><span class="n">futures</span><span class="o">.</span><span class="n">ThreadPoolExecutor</span><span class="p">()</span> <span class="k">as</span> <span class="n">executor</span><span class="p">:</span> |
| <span class="n">future</span> <span class="o">=</span> <span class="n">executor</span><span class="o">.</span><span class="n">submit</span><span class="p">(</span><span class="n">caller</span><span class="p">,</span> <span class="n">request</span><span class="p">)</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">future</span><span class="o">.</span><span class="n">result</span><span class="p">(</span><span class="n">timeout</span><span class="o">=</span><span class="n">timeout</span><span class="p">)</span> |
| <span class="k">except</span> <span class="n">TooManyRequests</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s1">'request could not be completed. got code </span><span class="si">%i</span><span class="s1"> from the service.'</span><span class="p">,</span> |
| <span class="n">e</span><span class="o">.</span><span class="n">code</span><span class="p">)</span> |
| <span class="k">raise</span> <span class="n">e</span> |
| <span class="k">except</span> <span class="n">concurrent</span><span class="o">.</span><span class="n">futures</span><span class="o">.</span><span class="n">TimeoutError</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">metrics_collector</span><span class="p">:</span> |
| <span class="n">metrics_collector</span><span class="o">.</span><span class="n">timeout_requests</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> |
| <span class="k">raise</span> <span class="n">UserCodeTimeoutException</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s1">'Timeout </span><span class="si">{</span><span class="n">timeout</span><span class="si">}</span><span class="s1"> exceeded '</span> |
| <span class="sa">f</span><span class="s1">'while completing request: </span><span class="si">{</span><span class="n">request</span><span class="si">}</span><span class="s1">'</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">RuntimeError</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">metrics_collector</span><span class="p">:</span> |
| <span class="n">metrics_collector</span><span class="o">.</span><span class="n">failures</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> |
| <span class="k">raise</span> <span class="n">UserCodeExecutionException</span><span class="p">(</span><span class="s1">'could not complete request'</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="ExponentialBackOffRepeater"> |
| <a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.ExponentialBackOffRepeater">[docs]</a> |
| <span class="k">class</span><span class="w"> </span><span class="nc">ExponentialBackOffRepeater</span><span class="p">(</span><span class="n">Repeater</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Configure exponential backoff retry strategy.</span> |
| |
| <span class="sd"> It retries for exceptions due to the remote service such as</span> |
| <span class="sd"> TooManyRequests (HTTP 429), UserCodeTimeoutException, UserCodeQuotaException.</span> |
| |
| <span class="sd"> It utilizes the decorator</span> |
| <span class="sd"> :func:`apache_beam.utils.retry.with_exponential_backoff`.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">pass</span> |
| |
| <div class="viewcode-block" id="ExponentialBackOffRepeater.repeat"> |
| <a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.ExponentialBackOffRepeater.repeat">[docs]</a> |
| <span class="nd">@retry</span><span class="o">.</span><span class="n">with_exponential_backoff</span><span class="p">(</span> |
| <span class="n">num_retries</span><span class="o">=</span><span class="mi">2</span><span class="p">,</span> <span class="n">retry_filter</span><span class="o">=</span><span class="n">retry_on_exception</span><span class="p">)</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">repeat</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span> |
| <span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">,</span> |
| <span class="n">timeout</span><span class="p">:</span> <span class="nb">float</span><span class="p">,</span> |
| <span class="n">metrics_collector</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">_MetricsCollector</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-></span> <span class="n">ResponseT</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""repeat method is called from the RequestResponseIO when</span> |
| <span class="sd"> a repeater is enabled.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> caller: a `~apache_beam.io.requestresponse.Caller` object that</span> |
| <span class="sd"> calls the API.</span> |
| <span class="sd"> request: input request to repeat.</span> |
| <span class="sd"> timeout: time to wait for the request to complete.</span> |
| <span class="sd"> metrics_collector: (Optional) a</span> |
| <span class="sd"> `~apache_beam.io.requestresponse._MetricsCollector` object to</span> |
| <span class="sd"> collect the metrics for RequestResponseIO.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="n">_execute_request</span><span class="p">(</span><span class="n">caller</span><span class="p">,</span> <span class="n">request</span><span class="p">,</span> <span class="n">timeout</span><span class="p">,</span> <span class="n">metrics_collector</span><span class="p">)</span></div> |
| </div> |
| |
| |
| |
| <div class="viewcode-block" id="NoOpsRepeater"> |
| <a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.NoOpsRepeater">[docs]</a> |
| <span class="k">class</span><span class="w"> </span><span class="nc">NoOpsRepeater</span><span class="p">(</span><span class="n">Repeater</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Executes a request just once irrespective of any exception.</span> |
| <span class="sd"> """</span> |
| <div class="viewcode-block" id="NoOpsRepeater.repeat"> |
| <a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.NoOpsRepeater.repeat">[docs]</a> |
| <span class="k">def</span><span class="w"> </span><span class="nf">repeat</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span> |
| <span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">,</span> |
| <span class="n">timeout</span><span class="p">:</span> <span class="nb">float</span><span class="p">,</span> |
| <span class="n">metrics_collector</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">_MetricsCollector</span><span class="p">])</span> <span class="o">-></span> <span class="n">ResponseT</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">_execute_request</span><span class="p">(</span><span class="n">caller</span><span class="p">,</span> <span class="n">request</span><span class="p">,</span> <span class="n">timeout</span><span class="p">,</span> <span class="n">metrics_collector</span><span class="p">)</span></div> |
| </div> |
| |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">PreCallThrottler</span><span class="p">(</span><span class="n">abc</span><span class="o">.</span><span class="n">ABC</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Provides a throttle mechanism before sending request."""</span> |
| <span class="k">pass</span> |
| |
| |
| <div class="viewcode-block" id="DefaultThrottler"> |
| <a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.DefaultThrottler">[docs]</a> |
| <span class="k">class</span><span class="w"> </span><span class="nc">DefaultThrottler</span><span class="p">(</span><span class="n">PreCallThrottler</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Default throttler that uses</span> |
| <span class="sd"> :class:`apache_beam.io.components.adaptive_throttler.AdaptiveThrottler`</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> window_ms (int): length of history to consider, in ms, to set throttling.</span> |
| <span class="sd"> bucket_ms (int): granularity of time buckets that we store data in, in ms.</span> |
| <span class="sd"> overload_ratio (float): the target ratio between requests sent and</span> |
| <span class="sd"> successful requests. This is "K" in the formula in</span> |
| <span class="sd"> https://landing.google.com/sre/book/chapters/handling-overload.html.</span> |
| <span class="sd"> delay_secs (int): minimum number of seconds to throttle a request.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">window_ms</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">1</span><span class="p">,</span> |
| <span class="n">bucket_ms</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">1</span><span class="p">,</span> |
| <span class="n">overload_ratio</span><span class="p">:</span> <span class="nb">float</span> <span class="o">=</span> <span class="mi">2</span><span class="p">,</span> |
| <span class="n">delay_secs</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">5</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">throttler</span> <span class="o">=</span> <span class="n">AdaptiveThrottler</span><span class="p">(</span> |
| <span class="n">window_ms</span><span class="o">=</span><span class="n">window_ms</span><span class="p">,</span> <span class="n">bucket_ms</span><span class="o">=</span><span class="n">bucket_ms</span><span class="p">,</span> <span class="n">overload_ratio</span><span class="o">=</span><span class="n">overload_ratio</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">delay_secs</span> <span class="o">=</span> <span class="n">delay_secs</span></div> |
| |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">_FilterCacheReadFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A `DoFn` that partitions cache reads.</span> |
| |
| <span class="sd"> It emits to main output for successful cache read requests or</span> |
| <span class="sd"> to the tagged output - `cache_misses` - otherwise."""</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">:</span> <span class="n">Tuple</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]:</span> |
| <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span><span class="s1">'cache_misses'</span><span class="p">,</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">element</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">_Call</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">RequestT</span><span class="p">],</span> |
| <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]]):</span> |
| <span class="w"> </span><span class="sd">"""(Internal-only) PTransform that invokes a remote function on each element</span> |
| <span class="sd"> of the input PCollection.</span> |
| |
| <span class="sd"> This PTransform uses a `Caller` object to invoke the actual API calls,</span> |
| <span class="sd"> and uses ``__enter__`` and ``__exit__`` to manage setup and teardown of</span> |
| <span class="sd"> clients when applicable. Additionally, a timeout value is specified to</span> |
| <span class="sd"> regulate the duration of each call, defaults to 30 seconds.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> caller: a `Caller` object that invokes API call.</span> |
| <span class="sd"> timeout (float): timeout value in seconds to wait for response from API.</span> |
| <span class="sd"> should_backoff: (Optional) provides methods for backoff.</span> |
| <span class="sd"> repeater: (Optional) provides methods to repeat requests to API.</span> |
| <span class="sd"> throttler: (Optional) provides methods to pre-throttle a request.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span> |
| <span class="n">timeout</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">float</span><span class="p">]</span> <span class="o">=</span> <span class="n">DEFAULT_TIMEOUT_SECS</span><span class="p">,</span> |
| <span class="n">should_backoff</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">ShouldBackOff</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">repeater</span><span class="p">:</span> <span class="n">Repeater</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">throttler</span><span class="p">:</span> <span class="n">PreCallThrottler</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_caller</span> <span class="o">=</span> <span class="n">caller</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span> <span class="o">=</span> <span class="n">timeout</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_should_backoff</span> <span class="o">=</span> <span class="n">should_backoff</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span> <span class="o">=</span> <span class="n">repeater</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span> <span class="o">=</span> <span class="n">throttler</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="nf">expand</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">requests</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">RequestT</span><span class="p">])</span> <span class="o">-></span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]:</span> |
| <span class="k">return</span> <span class="n">requests</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">_CallDoFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="p">))</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">_CallDoFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">setup</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="o">.</span><span class="fm">__enter__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span> <span class="o">=</span> <span class="n">_MetricsCollector</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="o">.</span><span class="fm">__str__</span><span class="p">())</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="o">.</span><span class="n">setup_counter</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span> |
| <span class="n">timeout</span><span class="p">:</span> <span class="nb">float</span><span class="p">,</span> |
| <span class="n">repeater</span><span class="p">:</span> <span class="n">Repeater</span><span class="p">,</span> |
| <span class="n">throttler</span><span class="p">:</span> <span class="n">PreCallThrottler</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_caller</span> <span class="o">=</span> <span class="n">caller</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span> <span class="o">=</span> <span class="n">timeout</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span> <span class="o">=</span> <span class="n">repeater</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span> <span class="o">=</span> <span class="n">throttler</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">request</span><span class="p">:</span> <span class="n">RequestT</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="o">.</span><span class="n">requests</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> |
| |
| <span class="n">is_throttled_request</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="p">:</span> |
| <span class="k">while</span> <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="o">.</span><span class="n">throttler</span><span class="o">.</span><span class="n">throttle_request</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">*</span> |
| <span class="n">MSEC_TO_SEC</span><span class="p">):</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s2">"Delaying request for </span><span class="si">%d</span><span class="s2"> seconds"</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="o">.</span><span class="n">delay_secs</span><span class="p">)</span> |
| <span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="o">.</span><span class="n">delay_secs</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="o">.</span><span class="n">throttled_secs</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="o">.</span><span class="n">delay_secs</span><span class="p">)</span> |
| <span class="n">is_throttled_request</span> <span class="o">=</span> <span class="kc">True</span> |
| |
| <span class="k">if</span> <span class="n">is_throttled_request</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="o">.</span><span class="n">throttled_requests</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">req_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> |
| <span class="n">response</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span><span class="o">.</span><span class="n">repeat</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="p">,</span> <span class="n">request</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="o">.</span><span class="n">responses</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="o">.</span><span class="n">throttler</span><span class="o">.</span><span class="n">successful_request</span><span class="p">(</span><span class="n">req_time</span> <span class="o">*</span> <span class="n">MSEC_TO_SEC</span><span class="p">)</span> |
| <span class="k">yield</span> <span class="n">response</span> |
| <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">e</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="nf">teardown</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_metrics_collector</span><span class="o">.</span><span class="n">teardown_counter</span><span class="o">.</span><span class="n">inc</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="o">.</span><span class="fm">__exit__</span><span class="p">(</span><span class="o">*</span><span class="n">sys</span><span class="o">.</span><span class="n">exc_info</span><span class="p">())</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">Cache</span><span class="p">(</span><span class="n">abc</span><span class="o">.</span><span class="n">ABC</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Base Cache class for</span> |
| <span class="sd"> :class:`apache_beam.io.requestresponse.RequestResponseIO`.</span> |
| |
| <span class="sd"> For adding cache support to RequestResponseIO, implement this class.</span> |
| <span class="sd"> """</span> |
| <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">get_read</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""returns a PTransform that reads from the cache."""</span> |
| <span class="k">pass</span> |
| |
| <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">get_write</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""returns a PTransform that writes to the cache."""</span> |
| <span class="k">pass</span> |
| |
| <span class="nd">@property</span> |
| <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">request_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""request coder to use with Cache."""</span> |
| <span class="k">pass</span> |
| |
| <span class="nd">@request_coder</span><span class="o">.</span><span class="n">setter</span> |
| <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">request_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">request_coder</span><span class="p">:</span> <span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""sets the request coder to use with Cache."""</span> |
| <span class="k">pass</span> |
| |
| <span class="nd">@property</span> |
| <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">source_caller</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Actual caller that is using the cache."""</span> |
| <span class="k">pass</span> |
| |
| <span class="nd">@source_caller</span><span class="o">.</span><span class="n">setter</span> |
| <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">source_caller</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Sets the source caller for</span> |
| <span class="sd"> :class:`apache_beam.io.requestresponse.RequestResponseIO` to pull</span> |
| <span class="sd"> cache request key from respective callers."""</span> |
| <span class="k">pass</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">_RedisMode</span><span class="p">(</span><span class="n">enum</span><span class="o">.</span><span class="n">Enum</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Mode of operation for redis cache when using</span> |
| <span class="sd"> `~apache_beam.io.requestresponse._RedisCaller`.</span> |
| <span class="sd"> """</span> |
| <span class="n">READ</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="n">WRITE</span> <span class="o">=</span> <span class="mi">1</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">_RedisCaller</span><span class="p">(</span><span class="n">Caller</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""An implementation of</span> |
| <span class="sd"> `~apache_beam.io.requestresponse.Caller` for Redis client.</span> |
| |
| <span class="sd"> It provides the functionality for making requests to Redis server using</span> |
| <span class="sd"> :class:`apache_beam.io.requestresponse.RequestResponseIO`.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">host</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">port</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> |
| <span class="n">time_to_live</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="n">timedelta</span><span class="p">],</span> |
| <span class="o">*</span><span class="p">,</span> |
| <span class="n">request_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">],</span> |
| <span class="n">response_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">],</span> |
| <span class="n">kwargs</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">source_caller</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Caller</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">mode</span><span class="p">:</span> <span class="n">_RedisMode</span><span class="p">,</span> |
| <span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Args:</span> |
| <span class="sd"> host (str): The hostname or IP address of the Redis server.</span> |
| <span class="sd"> port (int): The port number of the Redis server.</span> |
| <span class="sd"> time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for</span> |
| <span class="sd"> records stored in Redis. Provide an integer (in seconds) or a</span> |
| <span class="sd"> `datetime.timedelta` object.</span> |
| <span class="sd"> request_coder: (Optional[`coders.Coder`]) coder for requests stored</span> |
| <span class="sd"> in Redis.</span> |
| <span class="sd"> response_coder: (Optional[`coders.Coder`]) coder for decoding responses</span> |
| <span class="sd"> received from Redis.</span> |
| <span class="sd"> kwargs: Optional(Dict[str, Any]) additional keyword arguments that</span> |
| <span class="sd"> are required to connect to your redis server. Same as `redis.Redis()`.</span> |
| <span class="sd"> source_caller: (Optional[`Caller`]): The source caller using this Redis</span> |
| <span class="sd"> cache in case of fetching the cache request to store in Redis.</span> |
| <span class="sd"> mode: `_RedisMode` An enum type specifying the operational mode of</span> |
| <span class="sd"> the `_RedisCaller`.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">host</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">port</span> <span class="o">=</span> <span class="n">host</span><span class="p">,</span> <span class="n">port</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">time_to_live</span> <span class="o">=</span> <span class="n">time_to_live</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span> <span class="o">=</span> <span class="n">request_coder</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span> <span class="o">=</span> <span class="n">response_coder</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">source_caller</span> <span class="o">=</span> <span class="n">source_caller</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">mode</span> <span class="o">=</span> <span class="n">mode</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="fm">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">redis</span><span class="o">.</span><span class="n">Redis</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">host</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">port</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="nf">_read_cache</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="n">cache_request</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">source_caller</span><span class="o">.</span><span class="n">get_cache_key</span><span class="p">(</span><span class="n">element</span><span class="p">)</span> |
| <span class="c1"># check if the caller is a enrichment handler. EnrichmentHandler</span> |
| <span class="c1"># provides the request format for cache.</span> |
| <span class="k">if</span> <span class="n">cache_request</span><span class="p">:</span> |
| <span class="n">encoded_request</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">cache_request</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">encoded_request</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">element</span><span class="p">)</span> |
| |
| <span class="n">encoded_response</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">encoded_request</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">encoded_response</span><span class="p">:</span> |
| <span class="c1"># no cache entry present for this request.</span> |
| <span class="k">return</span> <span class="n">element</span><span class="p">,</span> <span class="kc">None</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">response_dict</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">encoded_response</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">))</span> |
| <span class="n">response</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">Row</span><span class="p">(</span><span class="o">**</span><span class="n">response_dict</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span> |
| <span class="s1">'cannot decode response from redis cache for </span><span class="si">%s</span><span class="s1">.'</span> <span class="o">%</span> <span class="n">element</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">element</span><span class="p">,</span> <span class="kc">None</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">response</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="n">encoded_response</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">element</span><span class="p">,</span> <span class="n">response</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="nf">_write_cache</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="n">cache_request</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">source_caller</span><span class="o">.</span><span class="n">get_cache_key</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> |
| <span class="k">if</span> <span class="n">cache_request</span><span class="p">:</span> |
| <span class="n">encoded_request</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">cache_request</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">encoded_request</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">encoded_response</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">_asdict</span><span class="p">())</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span> |
| <span class="s1">'cannot encode response </span><span class="si">%s</span><span class="s1"> for </span><span class="si">%s</span><span class="s1"> to store in '</span> |
| <span class="s1">'redis cache.'</span> <span class="o">%</span> <span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">],</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]))</span> |
| <span class="k">return</span> <span class="n">element</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">encoded_response</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span> |
| <span class="c1"># Write to cache with TTL. Set nx to True to prevent overwriting for the</span> |
| <span class="c1"># same key.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">set</span><span class="p">(</span> |
| <span class="n">encoded_request</span><span class="p">,</span> <span class="n">encoded_response</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">time_to_live</span><span class="p">,</span> <span class="n">nx</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">element</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="fm">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">mode</span> <span class="o">==</span> <span class="n">_RedisMode</span><span class="o">.</span><span class="n">READ</span><span class="p">:</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">List</span><span class="p">):</span> |
| <span class="n">responses</span> <span class="o">=</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_cache</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">element</span><span class="p">]</span> |
| <span class="k">return</span> <span class="n">responses</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_cache</span><span class="p">(</span><span class="n">element</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">List</span><span class="p">):</span> |
| <span class="n">responses</span> <span class="o">=</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">_write_cache</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">element</span><span class="p">]</span> |
| <span class="k">return</span> <span class="n">responses</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_write_cache</span><span class="p">(</span><span class="n">element</span><span class="p">)</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="fm">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exc_type</span><span class="p">,</span> <span class="n">exc_val</span><span class="p">,</span> <span class="n">exc_tb</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">_ReadFromRedis</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">RequestT</span><span class="p">],</span> |
| <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]]):</span> |
| <span class="w"> </span><span class="sd">"""A `PTransform` that performs Redis cache read."""</span> |
| <span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">host</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">port</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> |
| <span class="n">time_to_live</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="n">timedelta</span><span class="p">],</span> |
| <span class="o">*</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">request_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">],</span> |
| <span class="n">response_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">],</span> |
| <span class="n">source_caller</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Args:</span> |
| <span class="sd"> host (str): The hostname or IP address of the Redis server.</span> |
| <span class="sd"> port (int): The port number of the Redis server.</span> |
| <span class="sd"> time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for</span> |
| <span class="sd"> records stored in Redis. Provide an integer (in seconds) or a</span> |
| <span class="sd"> `datetime.timedelta` object.</span> |
| <span class="sd"> kwargs: Optional(Dict[str, Any]) additional keyword arguments that</span> |
| <span class="sd"> are required to connect to your redis server. Same as `redis.Redis()`.</span> |
| <span class="sd"> request_coder: (Optional[`coders.Coder`]) coder for requests stored</span> |
| <span class="sd"> in Redis.</span> |
| <span class="sd"> response_coder: (Optional[`coders.Coder`]) coder for decoding responses</span> |
| <span class="sd"> received from Redis.</span> |
| <span class="sd"> source_caller: (Optional[`Caller`]): The source caller using this Redis</span> |
| <span class="sd"> cache in case of fetching the cache request to store in Redis.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span> <span class="o">=</span> <span class="n">request_coder</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span> <span class="o">=</span> <span class="n">response_coder</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">redis_caller</span> <span class="o">=</span> <span class="n">_RedisCaller</span><span class="p">(</span> |
| <span class="n">host</span><span class="p">,</span> |
| <span class="n">port</span><span class="p">,</span> |
| <span class="n">time_to_live</span><span class="p">,</span> |
| <span class="n">request_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span><span class="p">,</span> |
| <span class="n">response_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="o">=</span><span class="n">kwargs</span><span class="p">,</span> |
| <span class="n">source_caller</span><span class="o">=</span><span class="n">source_caller</span><span class="p">,</span> |
| <span class="n">mode</span><span class="o">=</span><span class="n">_RedisMode</span><span class="o">.</span><span class="n">READ</span><span class="p">)</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="nf">expand</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">requests</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">RequestT</span><span class="p">])</span> <span class="o">-></span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]:</span> |
| <span class="k">return</span> <span class="n">requests</span> <span class="o">|</span> <span class="n">RequestResponseIO</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">redis_caller</span><span class="p">)</span> |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">_WriteToRedis</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> |
| <span class="n">ResponseT</span><span class="p">]],</span> |
| <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]]):</span> |
| <span class="w"> </span><span class="sd">"""A `PTransfrom` that performs write to Redis cache."""</span> |
| <span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">host</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">port</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> |
| <span class="n">time_to_live</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="n">timedelta</span><span class="p">],</span> |
| <span class="o">*</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">request_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">],</span> |
| <span class="n">response_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">],</span> |
| <span class="n">source_caller</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Args:</span> |
| <span class="sd"> host (str): The hostname or IP address of the Redis server.</span> |
| <span class="sd"> port (int): The port number of the Redis server.</span> |
| <span class="sd"> time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for</span> |
| <span class="sd"> records stored in Redis. Provide an integer (in seconds) or a</span> |
| <span class="sd"> `datetime.timedelta` object.</span> |
| <span class="sd"> kwargs: Optional(Dict[str, Any]) additional keyword arguments that</span> |
| <span class="sd"> are required to connect to your redis server. Same as `redis.Redis()`.</span> |
| <span class="sd"> request_coder: (Optional[`coders.Coder`]) coder for requests stored</span> |
| <span class="sd"> in Redis.</span> |
| <span class="sd"> response_coder: (Optional[`coders.Coder`]) coder for decoding responses</span> |
| <span class="sd"> received from Redis.</span> |
| <span class="sd"> source_caller: (Optional[`Caller`]): The source caller using this Redis</span> |
| <span class="sd"> cache in case of fetching the cache request to store in Redis.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span> <span class="o">=</span> <span class="n">request_coder</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span> <span class="o">=</span> <span class="n">response_coder</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">redis_caller</span> <span class="o">=</span> <span class="n">_RedisCaller</span><span class="p">(</span> |
| <span class="n">host</span><span class="p">,</span> |
| <span class="n">port</span><span class="p">,</span> |
| <span class="n">time_to_live</span><span class="p">,</span> |
| <span class="n">request_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">request_coder</span><span class="p">,</span> |
| <span class="n">response_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">response_coder</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="o">=</span><span class="n">kwargs</span><span class="p">,</span> |
| <span class="n">source_caller</span><span class="o">=</span><span class="n">source_caller</span><span class="p">,</span> |
| <span class="n">mode</span><span class="o">=</span><span class="n">_RedisMode</span><span class="o">.</span><span class="n">WRITE</span><span class="p">)</span> |
| |
| <span class="k">def</span><span class="w"> </span><span class="nf">expand</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="n">elements</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">]]</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]:</span> |
| <span class="k">return</span> <span class="n">elements</span> <span class="o">|</span> <span class="n">RequestResponseIO</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">redis_caller</span><span class="p">)</span> |
| |
| |
| <span class="k">def</span><span class="w"> </span><span class="nf">ensure_coders_exist</span><span class="p">(</span><span class="n">request_coder</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""checks if the coder exists to encode the request for caching."""</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">request_coder</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'need request coder to be able to use '</span> |
| <span class="s1">'Cache with RequestResponseIO.'</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="RedisCache"> |
| <a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.RedisCache">[docs]</a> |
| <span class="k">class</span><span class="w"> </span><span class="nc">RedisCache</span><span class="p">(</span><span class="n">Cache</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Configure cache using Redis for</span> |
| <span class="sd"> :class:`apache_beam.io.requestresponse.RequestResponseIO`."""</span> |
| <span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">host</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">port</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> |
| <span class="n">time_to_live</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="n">timedelta</span><span class="p">]</span> <span class="o">=</span> <span class="n">DEFAULT_CACHE_ENTRY_TTL_SEC</span><span class="p">,</span> |
| <span class="o">*</span><span class="p">,</span> |
| <span class="n">request_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">response_coder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="o">**</span><span class="n">kwargs</span><span class="p">,</span> |
| <span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Args:</span> |
| <span class="sd"> host (str): The hostname or IP address of the Redis server.</span> |
| <span class="sd"> port (int): The port number of the Redis server.</span> |
| <span class="sd"> time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for</span> |
| <span class="sd"> records stored in Redis. Provide an integer (in seconds) or a</span> |
| <span class="sd"> `datetime.timedelta` object.</span> |
| <span class="sd"> request_coder: (Optional[`coders.Coder`]) coder for encoding requests.</span> |
| <span class="sd"> response_coder: (Optional[`coders.Coder`]) coder for decoding responses</span> |
| <span class="sd"> received from Redis.</span> |
| <span class="sd"> kwargs: Optional additional keyword arguments that</span> |
| <span class="sd"> are required to connect to your redis server. Same as `redis.Redis()`.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_host</span> <span class="o">=</span> <span class="n">host</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_port</span> <span class="o">=</span> <span class="n">port</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_time_to_live</span> <span class="o">=</span> <span class="n">time_to_live</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span> <span class="o">=</span> <span class="n">request_coder</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_response_coder</span> <span class="o">=</span> <span class="n">response_coder</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span> <span class="o">=</span> <span class="n">kwargs</span> <span class="k">if</span> <span class="n">kwargs</span> <span class="k">else</span> <span class="p">{}</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_source_caller</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <div class="viewcode-block" id="RedisCache.get_read"> |
| <a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.RedisCache.get_read">[docs]</a> |
| <span class="k">def</span><span class="w"> </span><span class="nf">get_read</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""get_read returns a PTransform for reading from the cache."""</span> |
| <span class="n">ensure_coders_exist</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">_ReadFromRedis</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_host</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_port</span><span class="p">,</span> |
| <span class="n">time_to_live</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_time_to_live</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="p">,</span> |
| <span class="n">request_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span><span class="p">,</span> |
| <span class="n">response_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_response_coder</span><span class="p">,</span> |
| <span class="n">source_caller</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_source_caller</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="RedisCache.get_write"> |
| <a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.RedisCache.get_write">[docs]</a> |
| <span class="k">def</span><span class="w"> </span><span class="nf">get_write</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""returns a PTransform for writing to the cache."""</span> |
| <span class="n">ensure_coders_exist</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">_WriteToRedis</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_host</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_port</span><span class="p">,</span> |
| <span class="n">time_to_live</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_time_to_live</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="p">,</span> |
| <span class="n">request_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span><span class="p">,</span> |
| <span class="n">response_coder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_response_coder</span><span class="p">,</span> |
| <span class="n">source_caller</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_source_caller</span><span class="p">)</span></div> |
| |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">source_caller</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_caller</span> |
| |
| <span class="nd">@source_caller</span><span class="o">.</span><span class="n">setter</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">source_caller</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source_caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_source_caller</span> <span class="o">=</span> <span class="n">source_caller</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">request_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span> |
| |
| <span class="nd">@request_coder</span><span class="o">.</span><span class="n">setter</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">request_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">request_coder</span><span class="p">:</span> <span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_request_coder</span> <span class="o">=</span> <span class="n">request_coder</span></div> |
| |
| |
| |
| <span class="k">class</span><span class="w"> </span><span class="nc">FlattenBatch</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Flatten a batched PCollection."""</span> |
| <span class="k">def</span><span class="w"> </span><span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">elements</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">element</span> <span class="ow">in</span> <span class="n">elements</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">element</span> |
| |
| |
| <div class="viewcode-block" id="RequestResponseIO"> |
| <a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.RequestResponseIO">[docs]</a> |
| <span class="k">class</span><span class="w"> </span><span class="nc">RequestResponseIO</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">RequestT</span><span class="p">],</span> |
| <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]]):</span> |
| <span class="w"> </span><span class="sd">"""A :class:`RequestResponseIO` transform to read and write to APIs.</span> |
| |
| <span class="sd"> Processes an input :class:`~apache_beam.pvalue.PCollection` of requests</span> |
| <span class="sd"> by making a call to the API as defined in `Caller`'s `__call__` method</span> |
| <span class="sd"> and returns a :class:`~apache_beam.pvalue.PCollection` of responses.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">caller</span><span class="p">:</span> <span class="n">Caller</span><span class="p">[</span><span class="n">RequestT</span><span class="p">,</span> <span class="n">ResponseT</span><span class="p">],</span> |
| <span class="n">timeout</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">float</span><span class="p">]</span> <span class="o">=</span> <span class="n">DEFAULT_TIMEOUT_SECS</span><span class="p">,</span> |
| <span class="n">should_backoff</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">ShouldBackOff</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">repeater</span><span class="p">:</span> <span class="n">Repeater</span> <span class="o">=</span> <span class="n">ExponentialBackOffRepeater</span><span class="p">(),</span> |
| <span class="n">cache</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Cache</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">throttler</span><span class="p">:</span> <span class="n">PreCallThrottler</span> <span class="o">=</span> <span class="n">DefaultThrottler</span><span class="p">(),</span> |
| <span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Instantiates a RequestResponseIO transform.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> caller: an implementation of</span> |
| <span class="sd"> `Caller` object that makes call to the API.</span> |
| <span class="sd"> timeout (float): timeout value in seconds to wait for response from API.</span> |
| <span class="sd"> should_backoff: (Optional) provides methods for backoff.</span> |
| <span class="sd"> repeater: provides method to repeat failed requests to API due to service</span> |
| <span class="sd"> errors. Defaults to</span> |
| <span class="sd"> :class:`apache_beam.io.requestresponse.ExponentialBackOffRepeater` to</span> |
| <span class="sd"> repeat requests with exponential backoff.</span> |
| <span class="sd"> cache: (Optional) a `~apache_beam.io.requestresponse.Cache` object</span> |
| <span class="sd"> to use the appropriate cache.</span> |
| <span class="sd"> throttler: provides methods to pre-throttle a request. Defaults to</span> |
| <span class="sd"> :class:`apache_beam.io.requestresponse.DefaultThrottler` for</span> |
| <span class="sd"> client-side adaptive throttling using</span> |
| <span class="sd"> :class:`apache_beam.io.components.adaptive_throttler.AdaptiveThrottler`</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_caller</span> <span class="o">=</span> <span class="n">caller</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span> <span class="o">=</span> <span class="n">timeout</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_should_backoff</span> <span class="o">=</span> <span class="n">should_backoff</span> |
| <span class="k">if</span> <span class="n">repeater</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span> <span class="o">=</span> <span class="n">repeater</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span> <span class="o">=</span> <span class="n">NoOpsRepeater</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_cache</span> <span class="o">=</span> <span class="n">cache</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span> <span class="o">=</span> <span class="n">throttler</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batching_kwargs</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="o">.</span><span class="n">batch_elements_kwargs</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="RequestResponseIO.expand"> |
| <a class="viewcode-back" href="../../../apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.RequestResponseIO.expand">[docs]</a> |
| <span class="k">def</span><span class="w"> </span><span class="nf">expand</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">requests</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">RequestT</span><span class="p">])</span> <span class="o">-></span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ResponseT</span><span class="p">]:</span> |
| <span class="c1"># TODO(riteshghorse): handle Throttle PTransforms when available.</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cache</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_cache</span><span class="o">.</span><span class="n">source_caller</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_caller</span> |
| |
| <span class="n">inputs</span> <span class="o">=</span> <span class="n">requests</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cache</span><span class="p">:</span> |
| <span class="c1"># read from cache.</span> |
| <span class="n">outputs</span> <span class="o">=</span> <span class="n">inputs</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cache</span><span class="o">.</span><span class="n">get_read</span><span class="p">()</span> |
| <span class="c1"># filter responses that are None and send them to the Call transform</span> |
| <span class="c1"># to fetch a value from external service.</span> |
| <span class="n">cached_responses</span><span class="p">,</span> <span class="n">inputs</span> <span class="o">=</span> <span class="p">(</span><span class="n">outputs</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">_FilterCacheReadFn</span><span class="p">()</span> |
| <span class="p">)</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span> |
| <span class="s1">'cache_misses'</span><span class="p">,</span> <span class="n">main</span><span class="o">=</span><span class="s1">'cached_responses'</span><span class="p">))</span> |
| |
| <span class="c1"># Batch elements if batching is enabled.</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batching_kwargs</span><span class="p">:</span> |
| <span class="n">inputs</span> <span class="o">=</span> <span class="n">inputs</span> <span class="o">|</span> <span class="n">BatchElements</span><span class="p">(</span><span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">_batching_kwargs</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="p">,</span> <span class="n">DefaultThrottler</span><span class="p">):</span> |
| <span class="c1"># DefaultThrottler applies throttling in the DoFn of</span> |
| <span class="c1"># Call PTransform.</span> |
| <span class="n">responses</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">inputs</span> |
| <span class="o">|</span> <span class="n">_Call</span><span class="p">(</span> |
| <span class="n">caller</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="p">,</span> |
| <span class="n">timeout</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span><span class="p">,</span> |
| <span class="n">should_backoff</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_should_backoff</span><span class="p">,</span> |
| <span class="n">repeater</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span><span class="p">,</span> |
| <span class="n">throttler</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_throttler</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># No throttling mechanism. The requests are made to the external source</span> |
| <span class="c1"># as they come.</span> |
| <span class="n">responses</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">inputs</span> |
| <span class="o">|</span> <span class="n">_Call</span><span class="p">(</span> |
| <span class="n">caller</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_caller</span><span class="p">,</span> |
| <span class="n">timeout</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_timeout</span><span class="p">,</span> |
| <span class="n">should_backoff</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_should_backoff</span><span class="p">,</span> |
| <span class="n">repeater</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_repeater</span><span class="p">))</span> |
| |
| <span class="c1"># if batching is enabled then handle accordingly.</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batching_kwargs</span><span class="p">:</span> |
| <span class="n">responses</span> <span class="o">=</span> <span class="n">responses</span> <span class="o">|</span> <span class="s2">"FlattenBatch"</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">FlattenBatch</span><span class="p">())</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cache</span><span class="p">:</span> |
| <span class="c1"># write to cache.</span> |
| <span class="n">_</span> <span class="o">=</span> <span class="n">responses</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cache</span><span class="o">.</span><span class="n">get_write</span><span class="p">()</span> |
| <span class="k">return</span> <span class="p">(</span><span class="n">cached_responses</span><span class="p">,</span> <span class="n">responses</span><span class="p">)</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">()</span> |
| |
| <span class="k">return</span> <span class="n">responses</span></div> |
| </div> |
| |
| </pre></div> |
| |
| </div> |
| </div> |
| <footer> |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p>© Copyright %Y, Apache Beam.</p> |
| </div> |
| |
| Built with <a href="https://www.sphinx-doc.org/">Sphinx</a> using a |
| <a href="https://github.com/readthedocs/sphinx_rtd_theme">theme</a> |
| provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| |
| </footer> |
| </div> |
| </div> |
| </section> |
| </div> |
| <script> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| </body> |
| </html> |