| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>apache_beam.io.iobase — Apache Beam 2.47.0 documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../_static/language_data.js"></script> |
| <script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| |
| <script type="text/javascript" src="../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../search.html" /> |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../index.html" class="icon icon-home"> Apache Beam |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 2.47.0 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../index.html">Apache Beam</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../index.html">Module code</a> »</li> |
| |
| <li>apache_beam.io.iobase</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.io.iobase</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="sd">"""Sources and sinks.</span> |
| |
| <span class="sd">A Source manages record-oriented data input from a particular kind of source</span> |
| <span class="sd">(e.g. a set of files, a database table, etc.). The reader() method of a source</span> |
| <span class="sd">returns a reader object supporting the iterator protocol; iteration yields</span> |
| <span class="sd">raw records of unprocessed, serialized data.</span> |
| |
| |
| <span class="sd">A Sink manages record-oriented data output to a particular kind of sink</span> |
| <span class="sd">(e.g. a set of files, a database table, etc.). The writer() method of a sink</span> |
| <span class="sd">returns a writer object supporting writing records of serialized data to</span> |
| <span class="sd">the sink.</span> |
| <span class="sd">"""</span> |
| |
| <span class="c1"># pytype: skip-file</span> |
| |
| <span class="kn">import</span> <span class="nn">logging</span> |
| <span class="kn">import</span> <span class="nn">math</span> |
| <span class="kn">import</span> <span class="nn">random</span> |
| <span class="kn">import</span> <span class="nn">uuid</span> |
| <span class="kn">from</span> <span class="nn">collections</span> <span class="kn">import</span> <span class="n">namedtuple</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Any</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Iterator</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Optional</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Tuple</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Union</span> |
| |
| <span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">coders</span> |
| <span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">pvalue</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.coders.coders</span> <span class="kn">import</span> <span class="n">_MemoizingPickleCoder</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.internal</span> <span class="kn">import</span> <span class="n">pickler</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.portability</span> <span class="kn">import</span> <span class="n">common_urns</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.portability</span> <span class="kn">import</span> <span class="n">python_urns</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.portability.api</span> <span class="kn">import</span> <span class="n">beam_runner_api_pb2</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="kn">import</span> <span class="n">AsIter</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="kn">import</span> <span class="n">AsSingleton</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">Impulse</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">PTransform</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">core</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">ptransform</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">window</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="kn">import</span> <span class="n">DisplayDataItem</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="kn">import</span> <span class="n">HasDisplayData</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.utils</span> <span class="kn">import</span> <span class="n">timestamp</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.utils</span> <span class="kn">import</span> <span class="n">urns</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.utils.windowed_value</span> <span class="kn">import</span> <span class="n">WindowedValue</span> |
| |
| <span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="s1">'BoundedSource'</span><span class="p">,</span> |
| <span class="s1">'RangeTracker'</span><span class="p">,</span> |
| <span class="s1">'Read'</span><span class="p">,</span> |
| <span class="s1">'RestrictionProgress'</span><span class="p">,</span> |
| <span class="s1">'RestrictionTracker'</span><span class="p">,</span> |
| <span class="s1">'WatermarkEstimator'</span><span class="p">,</span> |
| <span class="s1">'Sink'</span><span class="p">,</span> |
| <span class="s1">'Write'</span><span class="p">,</span> |
| <span class="s1">'Writer'</span> |
| <span class="p">]</span> |
| |
| <span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span> |
| |
| <span class="c1"># Encapsulates information about a bundle of a source generated when method</span> |
| <span class="c1"># BoundedSource.split() is invoked.</span> |
| <span class="c1"># This is a named 4-tuple that has following fields.</span> |
| <span class="c1"># * weight - a number that represents the size of the bundle. This value will</span> |
| <span class="c1"># be used to compare the relative sizes of bundles generated by the</span> |
| <span class="c1"># current source.</span> |
| <span class="c1"># The weight returned here could be specified using a unit of your</span> |
| <span class="c1"># choice (for example, bundles of sizes 100MB, 200MB, and 700MB may</span> |
| <span class="c1"># specify weights 100, 200, 700 or 1, 2, 7) but all bundles of a</span> |
| <span class="c1"># source should specify the weight using the same unit.</span> |
| <span class="c1"># * source - a BoundedSource object for the bundle.</span> |
| <span class="c1"># * start_position - starting position of the bundle</span> |
| <span class="c1"># * stop_position - ending position of the bundle.</span> |
| <span class="c1">#</span> |
| <span class="c1"># Type for start and stop positions are specific to the bounded source and must</span> |
| <span class="c1"># be consistent throughout.</span> |
| <span class="n">SourceBundle</span> <span class="o">=</span> <span class="n">namedtuple</span><span class="p">(</span> |
| <span class="s1">'SourceBundle'</span><span class="p">,</span> <span class="s1">'weight source start_position stop_position'</span><span class="p">)</span> |
| |
| |
| <span class="k">class</span> <span class="nc">SourceBase</span><span class="p">(</span><span class="n">HasDisplayData</span><span class="p">,</span> <span class="n">urns</span><span class="o">.</span><span class="n">RunnerApiFn</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Base class for all sources that can be passed to beam.io.Read(...).</span> |
| <span class="sd"> """</span> |
| <span class="n">urns</span><span class="o">.</span><span class="n">RunnerApiFn</span><span class="o">.</span><span class="n">register_pickle_urn</span><span class="p">(</span><span class="n">python_urns</span><span class="o">.</span><span class="n">PICKLED_SOURCE</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">is_bounded</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> bool</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span> |
| |
| |
| <div class="viewcode-block" id="BoundedSource"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource">[docs]</a><span class="k">class</span> <span class="nc">BoundedSource</span><span class="p">(</span><span class="n">SourceBase</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A source that reads a finite amount of input records.</span> |
| |
| <span class="sd"> This class defines following operations which can be used to read the source</span> |
| <span class="sd"> efficiently.</span> |
| |
| <span class="sd"> * Size estimation - method ``estimate_size()`` may return an accurate</span> |
| <span class="sd"> estimation in bytes for the size of the source.</span> |
| <span class="sd"> * Splitting into bundles of a given size - method ``split()`` can be used to</span> |
| <span class="sd"> split the source into a set of sub-sources (bundles) based on a desired</span> |
| <span class="sd"> bundle size.</span> |
| <span class="sd"> * Getting a RangeTracker - method ``get_range_tracker()`` should return a</span> |
| <span class="sd"> ``RangeTracker`` object for a given position range for the position type</span> |
| <span class="sd"> of the records returned by the source.</span> |
| <span class="sd"> * Reading the data - method ``read()`` can be used to read data from the</span> |
| <span class="sd"> source while respecting the boundaries defined by a given</span> |
| <span class="sd"> ``RangeTracker``.</span> |
| |
| <span class="sd"> A runner will perform reading the source in two steps.</span> |
| |
| <span class="sd"> (1) Method ``get_range_tracker()`` will be invoked with start and end</span> |
| <span class="sd"> positions to obtain a ``RangeTracker`` for the range of positions the</span> |
| <span class="sd"> runner intends to read. Source must define a default initial start and end</span> |
| <span class="sd"> position range. These positions must be used if the start and/or end</span> |
| <span class="sd"> positions passed to the method ``get_range_tracker()`` are ``None``</span> |
| <span class="sd"> (2) Method read() will be invoked with the ``RangeTracker`` obtained in the</span> |
| <span class="sd"> previous step.</span> |
| |
| <span class="sd"> **Mutability**</span> |
| |
| <span class="sd"> A ``BoundedSource`` object should not be mutated while</span> |
| <span class="sd"> its methods (for example, ``read()``) are being invoked by a runner. Runner</span> |
| <span class="sd"> implementations may invoke methods of ``BoundedSource`` objects through</span> |
| <span class="sd"> multi-threaded and/or reentrant execution modes.</span> |
| <span class="sd"> """</span> |
| <div class="viewcode-block" id="BoundedSource.estimate_size"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.estimate_size">[docs]</a> <span class="k">def</span> <span class="nf">estimate_size</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> Optional[int]</span> |
| |
| <span class="w"> </span><span class="sd">"""Estimates the size of source in bytes.</span> |
| |
| <span class="sd"> An estimate of the total size (in bytes) of the data that would be read</span> |
| <span class="sd"> from this source. This estimate is in terms of external storage size,</span> |
| <span class="sd"> before performing decompression or other processing.</span> |
| |
| <span class="sd"> Returns:</span> |
| <span class="sd"> estimated size of the source if the size can be determined, ``None``</span> |
| <span class="sd"> otherwise.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="BoundedSource.split"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.split">[docs]</a> <span class="k">def</span> <span class="nf">split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> |
| <span class="n">desired_bundle_size</span><span class="p">,</span> <span class="c1"># type: int</span> |
| <span class="n">start_position</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Any]</span> |
| <span class="n">stop_position</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Any]</span> |
| <span class="p">):</span> |
| <span class="c1"># type: (...) -> Iterator[SourceBundle]</span> |
| |
| <span class="w"> </span><span class="sd">"""Splits the source into a set of bundles.</span> |
| |
| <span class="sd"> Bundles should be approximately of size ``desired_bundle_size`` bytes.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> desired_bundle_size: the desired size (in bytes) of the bundles returned.</span> |
| <span class="sd"> start_position: if specified the given position must be used as the</span> |
| <span class="sd"> starting position of the first bundle.</span> |
| <span class="sd"> stop_position: if specified the given position must be used as the ending</span> |
| <span class="sd"> position of the last bundle.</span> |
| <span class="sd"> Returns:</span> |
| <span class="sd"> an iterator of objects of type 'SourceBundle' that gives information about</span> |
| <span class="sd"> the generated bundles.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="BoundedSource.get_range_tracker"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.get_range_tracker">[docs]</a> <span class="k">def</span> <span class="nf">get_range_tracker</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> |
| <span class="n">start_position</span><span class="p">,</span> <span class="c1"># type: Optional[Any]</span> |
| <span class="n">stop_position</span><span class="p">,</span> <span class="c1"># type: Optional[Any]</span> |
| <span class="p">):</span> |
| <span class="c1"># type: (...) -> RangeTracker</span> |
| |
| <span class="w"> </span><span class="sd">"""Returns a RangeTracker for a given position range.</span> |
| |
| <span class="sd"> Framework may invoke ``read()`` method with the RangeTracker object returned</span> |
| <span class="sd"> here to read data from the source.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> start_position: starting position of the range. If 'None' default start</span> |
| <span class="sd"> position of the source must be used.</span> |
| <span class="sd"> stop_position: ending position of the range. If 'None' default stop</span> |
| <span class="sd"> position of the source must be used.</span> |
| <span class="sd"> Returns:</span> |
| <span class="sd"> a ``RangeTracker`` for the given position range.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="BoundedSource.read"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.read">[docs]</a> <span class="k">def</span> <span class="nf">read</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">range_tracker</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Returns an iterator that reads data from the source.</span> |
| |
| <span class="sd"> The returned set of data must respect the boundaries defined by the given</span> |
| <span class="sd"> ``RangeTracker`` object. For example:</span> |
| |
| <span class="sd"> * Returned set of data must be for the range</span> |
| <span class="sd"> ``[range_tracker.start_position, range_tracker.stop_position)``. Note</span> |
| <span class="sd"> that a source may decide to return records that start after</span> |
| <span class="sd"> ``range_tracker.stop_position``. See documentation in class</span> |
| <span class="sd"> ``RangeTracker`` for more details. Also, note that framework might</span> |
| <span class="sd"> invoke ``range_tracker.try_split()`` to perform dynamic split</span> |
| <span class="sd"> operations. range_tracker.stop_position may be updated</span> |
| <span class="sd"> dynamically due to successful dynamic split operations.</span> |
| <span class="sd"> * Method ``range_tracker.try_split()`` must be invoked for every record</span> |
| <span class="sd"> that starts at a split point.</span> |
| <span class="sd"> * Method ``range_tracker.record_current_position()`` may be invoked for</span> |
| <span class="sd"> records that do not start at split points.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> range_tracker: a ``RangeTracker`` whose boundaries must be respected</span> |
| <span class="sd"> when reading data from the source. A runner that reads this</span> |
| <span class="sd"> source muss pass a ``RangeTracker`` object that is not</span> |
| <span class="sd"> ``None``.</span> |
| <span class="sd"> Returns:</span> |
| <span class="sd"> an iterator of data read by the source.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="BoundedSource.default_output_coder"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.default_output_coder">[docs]</a> <span class="k">def</span> <span class="nf">default_output_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Coder that should be used for the records returned by the source.</span> |
| |
| <span class="sd"> Should be overridden by sources that produce objects that can be encoded</span> |
| <span class="sd"> more efficiently than pickling.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="nb">object</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BoundedSource.is_bounded"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.is_bounded">[docs]</a> <span class="k">def</span> <span class="nf">is_bounded</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="kc">True</span></div></div> |
| |
| |
| <div class="viewcode-block" id="RangeTracker"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker">[docs]</a><span class="k">class</span> <span class="nc">RangeTracker</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A thread safe object used by Dataflow source framework.</span> |
| |
| <span class="sd"> A Dataflow source is defined using a ''BoundedSource'' and a ''RangeTracker''</span> |
| <span class="sd"> pair. A ''RangeTracker'' is used by Dataflow source framework to perform</span> |
| <span class="sd"> dynamic work rebalancing of position-based sources.</span> |
| |
| <span class="sd"> **Position-based sources**</span> |
| |
| <span class="sd"> A position-based source is one where the source can be described by a range</span> |
| <span class="sd"> of positions of an ordered type and the records returned by the reader can be</span> |
| <span class="sd"> described by positions of the same type.</span> |
| |
| <span class="sd"> In case a record occupies a range of positions in the source, the most</span> |
| <span class="sd"> important thing about the record is the position where it starts.</span> |
| |
| <span class="sd"> Defining the semantics of positions for a source is entirely up to the source</span> |
| <span class="sd"> class, however the chosen definitions have to obey certain properties in order</span> |
| <span class="sd"> to make it possible to correctly split the source into parts, including</span> |
| <span class="sd"> dynamic splitting. Two main aspects need to be defined:</span> |
| |
| <span class="sd"> 1. How to assign starting positions to records.</span> |
| <span class="sd"> 2. Which records should be read by a source with a range '[A, B)'.</span> |
| |
| <span class="sd"> Moreover, reading a range must be *efficient*, i.e., the performance of</span> |
| <span class="sd"> reading a range should not significantly depend on the location of the range.</span> |
| <span class="sd"> For example, reading the range [A, B) should not require reading all data</span> |
| <span class="sd"> before 'A'.</span> |
| |
| <span class="sd"> The sections below explain exactly what properties these definitions must</span> |
| <span class="sd"> satisfy, and how to use a ``RangeTracker`` with a properly defined source.</span> |
| |
| <span class="sd"> **Properties of position-based sources**</span> |
| |
| <span class="sd"> The main requirement for position-based sources is *associativity*: reading</span> |
| <span class="sd"> records from '[A, B)' and records from '[B, C)' should give the same</span> |
| <span class="sd"> records as reading from '[A, C)', where 'A <= B <= C'. This property</span> |
| <span class="sd"> ensures that no matter how a range of positions is split into arbitrarily many</span> |
| <span class="sd"> sub-ranges, the total set of records described by them stays the same.</span> |
| |
| <span class="sd"> The other important property is how the source's range relates to positions of</span> |
| <span class="sd"> records in the source. In many sources each record can be identified by a</span> |
| <span class="sd"> unique starting position. In this case:</span> |
| |
| <span class="sd"> * All records returned by a source '[A, B)' must have starting positions in</span> |
| <span class="sd"> this range.</span> |
| <span class="sd"> * All but the last record should end within this range. The last record may or</span> |
| <span class="sd"> may not extend past the end of the range.</span> |
| <span class="sd"> * Records should not overlap.</span> |
| |
| <span class="sd"> Such sources should define "read '[A, B)'" as "read from the first record</span> |
| <span class="sd"> starting at or after 'A', up to but not including the first record starting</span> |
| <span class="sd"> at or after 'B'".</span> |
| |
| <span class="sd"> Some examples of such sources include reading lines or CSV from a text file,</span> |
| <span class="sd"> reading keys and values from a BigTable, etc.</span> |
| |
| <span class="sd"> The concept of *split points* allows to extend the definitions for dealing</span> |
| <span class="sd"> with sources where some records cannot be identified by a unique starting</span> |
| <span class="sd"> position.</span> |
| |
| <span class="sd"> In all cases, all records returned by a source '[A, B)' must *start* at or</span> |
| <span class="sd"> after 'A'.</span> |
| |
| <span class="sd"> **Split points**</span> |
| |
| <span class="sd"> Some sources may have records that are not directly addressable. For example,</span> |
| <span class="sd"> imagine a file format consisting of a sequence of compressed blocks. Each</span> |
| <span class="sd"> block can be assigned an offset, but records within the block cannot be</span> |
| <span class="sd"> directly addressed without decompressing the block. Let us refer to this</span> |
| <span class="sd"> hypothetical format as <i>CBF (Compressed Blocks Format)</i>.</span> |
| |
| <span class="sd"> Many such formats can still satisfy the associativity property. For example,</span> |
| <span class="sd"> in CBF, reading '[A, B)' can mean "read all the records in all blocks whose</span> |
| <span class="sd"> starting offset is in '[A, B)'".</span> |
| |
| <span class="sd"> To support such complex formats, we introduce the notion of *split points*. We</span> |
| <span class="sd"> say that a record is a split point if there exists a position 'A' such that</span> |
| <span class="sd"> the record is the first one to be returned when reading the range</span> |
| <span class="sd"> '[A, infinity)'. In CBF, the only split points would be the first records</span> |
| <span class="sd"> in each block.</span> |
| |
| <span class="sd"> Split points allow us to define the meaning of a record's position and a</span> |
| <span class="sd"> source's range in all cases:</span> |
| |
| <span class="sd"> * For a record that is at a split point, its position is defined to be the</span> |
| <span class="sd"> largest 'A' such that reading a source with the range '[A, infinity)'</span> |
| <span class="sd"> returns this record.</span> |
| <span class="sd"> * Positions of other records are only required to be non-decreasing.</span> |
| <span class="sd"> * Reading the source '[A, B)' must return records starting from the first</span> |
| <span class="sd"> split point at or after 'A', up to but not including the first split point</span> |
| <span class="sd"> at or after 'B'. In particular, this means that the first record returned</span> |
| <span class="sd"> by a source MUST always be a split point.</span> |
| <span class="sd"> * Positions of split points must be unique.</span> |
| |
| <span class="sd"> As a result, for any decomposition of the full range of the source into</span> |
| <span class="sd"> position ranges, the total set of records will be the full set of records in</span> |
| <span class="sd"> the source, and each record will be read exactly once.</span> |
| |
| <span class="sd"> **Consumed positions**</span> |
| |
| <span class="sd"> As the source is being read, and records read from it are being passed to the</span> |
| <span class="sd"> downstream transforms in the pipeline, we say that positions in the source are</span> |
| <span class="sd"> being *consumed*. When a reader has read a record (or promised to a caller</span> |
| <span class="sd"> that a record will be returned), positions up to and including the record's</span> |
| <span class="sd"> start position are considered *consumed*.</span> |
| |
| <span class="sd"> Dynamic splitting can happen only at *unconsumed* positions. If the reader</span> |
| <span class="sd"> just returned a record at offset 42 in a file, dynamic splitting can happen</span> |
| <span class="sd"> only at offset 43 or beyond, as otherwise that record could be read twice (by</span> |
| <span class="sd"> the current reader and by a reader of the task starting at 43).</span> |
| <span class="sd"> """</span> |
| |
| <span class="n">SPLIT_POINTS_UNKNOWN</span> <span class="o">=</span> <span class="nb">object</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="RangeTracker.start_position"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.start_position">[docs]</a> <span class="k">def</span> <span class="nf">start_position</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Returns the starting position of the current range, inclusive."""</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="RangeTracker.stop_position"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.stop_position">[docs]</a> <span class="k">def</span> <span class="nf">stop_position</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Returns the ending position of the current range, exclusive."""</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="RangeTracker.try_claim"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.try_claim">[docs]</a> <span class="k">def</span> <span class="nf">try_claim</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">position</span><span class="p">):</span> <span class="c1"># pylint: disable=unused-argument</span> |
| <span class="w"> </span><span class="sd">"""Atomically determines if a record at a split point is within the range.</span> |
| |
| <span class="sd"> This method should be called **if and only if** the record is at a split</span> |
| <span class="sd"> point. This method may modify the internal state of the ``RangeTracker`` by</span> |
| <span class="sd"> updating the last-consumed position to ``position``.</span> |
| |
| <span class="sd"> ** Thread safety **</span> |
| |
| <span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span> |
| <span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span> |
| <span class="sd"> lock object.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> position: starting position of a record being read by a source.</span> |
| |
| <span class="sd"> Returns:</span> |
| <span class="sd"> ``True``, if the given position falls within the current range, returns</span> |
| <span class="sd"> ``False`` otherwise.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="RangeTracker.set_current_position"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.set_current_position">[docs]</a> <span class="k">def</span> <span class="nf">set_current_position</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">position</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Updates the last-consumed position to the given position.</span> |
| |
| <span class="sd"> A source may invoke this method for records that do not start at split</span> |
| <span class="sd"> points. This may modify the internal state of the ``RangeTracker``. If the</span> |
| <span class="sd"> record starts at a split point, method ``try_claim()`` **must** be invoked</span> |
| <span class="sd"> instead of this method.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> position: starting position of a record being read by a source.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="RangeTracker.position_at_fraction"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.position_at_fraction">[docs]</a> <span class="k">def</span> <span class="nf">position_at_fraction</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fraction</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Returns the position at the given fraction.</span> |
| |
| <span class="sd"> Given a fraction within the range [0.0, 1.0) this method will return the</span> |
| <span class="sd"> position at the given fraction compared to the position range</span> |
| <span class="sd"> [self.start_position, self.stop_position).</span> |
| |
| <span class="sd"> ** Thread safety **</span> |
| |
| <span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span> |
| <span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span> |
| <span class="sd"> lock object.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> fraction: a float value within the range [0.0, 1.0).</span> |
| <span class="sd"> Returns:</span> |
| <span class="sd"> a position within the range [self.start_position, self.stop_position).</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="RangeTracker.try_split"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.try_split">[docs]</a> <span class="k">def</span> <span class="nf">try_split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">position</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Atomically splits the current range.</span> |
| |
| <span class="sd"> Determines a position to split the current range, split_position, based on</span> |
| <span class="sd"> the given position. In most cases split_position and position will be the</span> |
| <span class="sd"> same.</span> |
| |
| <span class="sd"> Splits the current range '[self.start_position, self.stop_position)'</span> |
| <span class="sd"> into a "primary" part '[self.start_position, split_position)' and a</span> |
| <span class="sd"> "residual" part '[split_position, self.stop_position)', assuming the</span> |
| <span class="sd"> current last-consumed position is within</span> |
| <span class="sd"> '[self.start_position, split_position)' (i.e., split_position has not been</span> |
| <span class="sd"> consumed yet).</span> |
| |
| <span class="sd"> If successful, updates the current range to be the primary and returns a</span> |
| <span class="sd"> tuple (split_position, split_fraction). split_fraction should be the</span> |
| <span class="sd"> fraction of size of range '[self.start_position, split_position)' compared</span> |
| <span class="sd"> to the original (before split) range</span> |
| <span class="sd"> '[self.start_position, self.stop_position)'.</span> |
| |
| <span class="sd"> If the split_position has already been consumed, returns ``None``.</span> |
| |
| <span class="sd"> ** Thread safety **</span> |
| |
| <span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span> |
| <span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span> |
| <span class="sd"> lock object.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> position: suggested position where the current range should try to</span> |
| <span class="sd"> be split at.</span> |
| <span class="sd"> Returns:</span> |
| <span class="sd"> a tuple containing the split position and split fraction if split is</span> |
| <span class="sd"> successful. Returns ``None`` otherwise.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="RangeTracker.fraction_consumed"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.fraction_consumed">[docs]</a> <span class="k">def</span> <span class="nf">fraction_consumed</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Returns the approximate fraction of consumed positions in the source.</span> |
| |
| <span class="sd"> ** Thread safety **</span> |
| |
| <span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span> |
| <span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span> |
| <span class="sd"> lock object.</span> |
| |
| <span class="sd"> Returns:</span> |
| <span class="sd"> the approximate fraction of positions that have been consumed by</span> |
| <span class="sd"> successful 'try_split()' and 'try_claim()' calls, or</span> |
| <span class="sd"> 0.0 if no such calls have happened.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="RangeTracker.split_points"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.split_points">[docs]</a> <span class="k">def</span> <span class="nf">split_points</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Gives the number of split points consumed and remaining.</span> |
| |
| <span class="sd"> For a ``RangeTracker`` used by a ``BoundedSource`` (within a</span> |
| <span class="sd"> ``BoundedSource.read()`` invocation) this method produces a 2-tuple that</span> |
| <span class="sd"> gives the number of split points consumed by the ``BoundedSource`` and the</span> |
| <span class="sd"> number of split points remaining within the range of the ``RangeTracker``</span> |
| <span class="sd"> that has not been consumed by the ``BoundedSource``.</span> |
| |
| <span class="sd"> More specifically, given that the position of the current record being read</span> |
| <span class="sd"> by ``BoundedSource`` is current_position this method produces a tuple that</span> |
| <span class="sd"> consists of</span> |
| <span class="sd"> (1) number of split points in the range [self.start_position(),</span> |
| <span class="sd"> current_position) without including the split point that is currently being</span> |
| <span class="sd"> consumed. This represents the total amount of parallelism in the consumed</span> |
| <span class="sd"> part of the source.</span> |
| <span class="sd"> (2) number of split points within the range</span> |
| <span class="sd"> [current_position, self.stop_position()) including the split point that is</span> |
| <span class="sd"> currently being consumed. This represents the total amount of parallelism in</span> |
| <span class="sd"> the unconsumed part of the source.</span> |
| |
| <span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span> |
| <span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span> |
| <span class="sd"> lock object.</span> |
| |
| <span class="sd"> ** General information about consumed and remaining number of split</span> |
| <span class="sd"> points returned by this method. **</span> |
| |
| <span class="sd"> * Before a source read (``BoundedSource.read()`` invocation) claims the</span> |
| <span class="sd"> first split point, number of consumed split points is 0. This condition</span> |
| <span class="sd"> holds independent of whether the input is "splittable". A splittable</span> |
| <span class="sd"> source is a source that has more than one split point.</span> |
| <span class="sd"> * Any source read that has only claimed one split point has 0 consumed</span> |
| <span class="sd"> split points since the first split point is the current split point and</span> |
| <span class="sd"> is still being processed. This condition holds independent of whether</span> |
| <span class="sd"> the input is splittable.</span> |
| <span class="sd"> * For an empty source read which never invokes</span> |
| <span class="sd"> ``RangeTracker.try_claim()``, the consumed number of split points is 0.</span> |
| <span class="sd"> This condition holds independent of whether the input is splittable.</span> |
| <span class="sd"> * For a source read which has invoked ``RangeTracker.try_claim()`` n</span> |
| <span class="sd"> times, the consumed number of split points is n -1.</span> |
| <span class="sd"> * If a ``BoundedSource`` sets a callback through function</span> |
| <span class="sd"> ``set_split_points_unclaimed_callback()``, ``RangeTracker`` can use that</span> |
| <span class="sd"> callback when determining remaining number of split points.</span> |
| <span class="sd"> * Remaining split points should include the split point that is currently</span> |
| <span class="sd"> being consumed by the source read. Hence if the above callback returns</span> |
| <span class="sd"> an integer value n, remaining number of split points should be (n + 1).</span> |
| <span class="sd"> * After last split point is claimed remaining split points becomes 1,</span> |
| <span class="sd"> because this unfinished read itself represents an unfinished split</span> |
| <span class="sd"> point.</span> |
| <span class="sd"> * After all records of the source has been consumed, remaining number of</span> |
| <span class="sd"> split points becomes 0 and consumed number of split points becomes equal</span> |
| <span class="sd"> to the total number of split points within the range being read by the</span> |
| <span class="sd"> source. This method does not address this condition and will continue to</span> |
| <span class="sd"> report number of consumed split points as</span> |
| <span class="sd"> ("total number of split points" - 1) and number of remaining split</span> |
| <span class="sd"> points as 1. A runner that performs the reading of the source can</span> |
| <span class="sd"> detect when all records have been consumed and adjust remaining and</span> |
| <span class="sd"> consumed number of split points accordingly.</span> |
| |
| <span class="sd"> ** Examples **</span> |
| |
| <span class="sd"> (1) A "perfectly splittable" input which can be read in parallel down to the</span> |
| <span class="sd"> individual records.</span> |
| |
| <span class="sd"> Consider a perfectly splittable input that consists of 50 split points.</span> |
| |
| <span class="sd"> * Before a source read (``BoundedSource.read()`` invocation) claims the</span> |
| <span class="sd"> first split point, number of consumed split points is 0 number of</span> |
| <span class="sd"> remaining split points is 50.</span> |
| <span class="sd"> * After claiming first split point, consumed number of split points is 0</span> |
| <span class="sd"> and remaining number of split is 50.</span> |
| <span class="sd"> * After claiming split point #30, consumed number of split points is 29</span> |
| <span class="sd"> and remaining number of split points is 21.</span> |
| <span class="sd"> * After claiming all 50 split points, consumed number of split points is</span> |
| <span class="sd"> 49 and remaining number of split points is 1.</span> |
| |
| <span class="sd"> (2) a "block-compressed" file format such as ``avroio``, in which a block of</span> |
| <span class="sd"> records has to be read as a whole, but different blocks can be read in</span> |
| <span class="sd"> parallel.</span> |
| |
| <span class="sd"> Consider a block compressed input that consists of 5 blocks.</span> |
| |
| <span class="sd"> * Before a source read (``BoundedSource.read()`` invocation) claims the</span> |
| <span class="sd"> first split point (first block), number of consumed split points is 0</span> |
| <span class="sd"> number of remaining split points is 5.</span> |
| <span class="sd"> * After claiming first split point, consumed number of split points is 0</span> |
| <span class="sd"> and remaining number of split is 5.</span> |
| <span class="sd"> * After claiming split point #3, consumed number of split points is 2</span> |
| <span class="sd"> and remaining number of split points is 3.</span> |
| <span class="sd"> * After claiming all 5 split points, consumed number of split points is</span> |
| <span class="sd"> 4 and remaining number of split points is 1.</span> |
| |
| <span class="sd"> (3) an "unsplittable" input such as a cursor in a database or a gzip</span> |
| <span class="sd"> compressed file.</span> |
| |
| <span class="sd"> Such an input is considered to have only a single split point. Number of</span> |
| <span class="sd"> consumed split points is always 0 and number of remaining split points</span> |
| <span class="sd"> is always 1.</span> |
| |
| <span class="sd"> By default ``RangeTracker` returns ``RangeTracker.SPLIT_POINTS_UNKNOWN`` for</span> |
| <span class="sd"> both consumed and remaining number of split points, which indicates that the</span> |
| <span class="sd"> number of split points consumed and remaining is unknown.</span> |
| |
| <span class="sd"> Returns:</span> |
| <span class="sd"> A pair that gives consumed and remaining number of split points. Consumed</span> |
| <span class="sd"> number of split points should be an integer larger than or equal to zero</span> |
| <span class="sd"> or ``RangeTracker.SPLIT_POINTS_UNKNOWN``. Remaining number of split points</span> |
| <span class="sd"> should be an integer larger than zero or</span> |
| <span class="sd"> ``RangeTracker.SPLIT_POINTS_UNKNOWN``.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="n">RangeTracker</span><span class="o">.</span><span class="n">SPLIT_POINTS_UNKNOWN</span><span class="p">,</span> <span class="n">RangeTracker</span><span class="o">.</span><span class="n">SPLIT_POINTS_UNKNOWN</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="RangeTracker.set_split_points_unclaimed_callback"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.set_split_points_unclaimed_callback">[docs]</a> <span class="k">def</span> <span class="nf">set_split_points_unclaimed_callback</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">callback</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Sets a callback for determining the unclaimed number of split points.</span> |
| |
| <span class="sd"> By invoking this function, a ``BoundedSource`` can set a callback function</span> |
| <span class="sd"> that may get invoked by the ``RangeTracker`` to determine the number of</span> |
| <span class="sd"> unclaimed split points. A split point is unclaimed if</span> |
| <span class="sd"> ``RangeTracker.try_claim()`` method has not been successfully invoked for</span> |
| <span class="sd"> that particular split point. The callback function accepts a single</span> |
| <span class="sd"> parameter, a stop position for the BoundedSource (stop_position). If the</span> |
| <span class="sd"> record currently being consumed by the ``BoundedSource`` is at position</span> |
| <span class="sd"> current_position, callback should return the number of split points within</span> |
| <span class="sd"> the range (current_position, stop_position). Note that, this should not</span> |
| <span class="sd"> include the split point that is currently being consumed by the source.</span> |
| |
| <span class="sd"> This function must be implemented by subclasses before being used.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> callback: a function that takes a single parameter, a stop position,</span> |
| <span class="sd"> and returns unclaimed number of split points for the source read</span> |
| <span class="sd"> operation that is calling this function. Value returned from</span> |
| <span class="sd"> callback should be either an integer larger than or equal to</span> |
| <span class="sd"> zero or ``RangeTracker.SPLIT_POINTS_UNKNOWN``.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div> |
| |
| |
| <div class="viewcode-block" id="Sink"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink">[docs]</a><span class="k">class</span> <span class="nc">Sink</span><span class="p">(</span><span class="n">HasDisplayData</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""This class is deprecated, no backwards-compatibility guarantees.</span> |
| |
| <span class="sd"> A resource that can be written to using the ``beam.io.Write`` transform.</span> |
| |
| <span class="sd"> Here ``beam`` stands for Apache Beam Python code imported in following manner.</span> |
| <span class="sd"> ``import apache_beam as beam``.</span> |
| |
| <span class="sd"> A parallel write to an ``iobase.Sink`` consists of three phases:</span> |
| |
| <span class="sd"> 1. A sequential *initialization* phase (e.g., creating a temporary output</span> |
| <span class="sd"> directory, etc.)</span> |
| <span class="sd"> 2. A parallel write phase where workers write *bundles* of records</span> |
| <span class="sd"> 3. A sequential *finalization* phase (e.g., committing the writes, merging</span> |
| <span class="sd"> output files, etc.)</span> |
| |
| <span class="sd"> Implementing a new sink requires extending two classes.</span> |
| |
| <span class="sd"> 1. iobase.Sink</span> |
| |
| <span class="sd"> ``iobase.Sink`` is an immutable logical description of the location/resource</span> |
| <span class="sd"> to write to. Depending on the type of sink, it may contain fields such as the</span> |
| <span class="sd"> path to an output directory on a filesystem, a database table name,</span> |
| <span class="sd"> etc. ``iobase.Sink`` provides methods for performing a write operation to the</span> |
| <span class="sd"> sink described by it. To this end, implementors of an extension of</span> |
| <span class="sd"> ``iobase.Sink`` must implement three methods:</span> |
| <span class="sd"> ``initialize_write()``, ``open_writer()``, and ``finalize_write()``.</span> |
| |
| <span class="sd"> 2. iobase.Writer</span> |
| |
| <span class="sd"> ``iobase.Writer`` is used to write a single bundle of records. An</span> |
| <span class="sd"> ``iobase.Writer`` defines two methods: ``write()`` which writes a</span> |
| <span class="sd"> single record from the bundle and ``close()`` which is called once</span> |
| <span class="sd"> at the end of writing a bundle.</span> |
| |
| <span class="sd"> See also ``apache_beam.io.filebasedsink.FileBasedSink`` which provides a</span> |
| <span class="sd"> simpler API for writing sinks that produce files.</span> |
| |
| <span class="sd"> **Execution of the Write transform**</span> |
| |
| <span class="sd"> ``initialize_write()``, ``pre_finalize()``, and ``finalize_write()`` are</span> |
| <span class="sd"> conceptually called once. However, implementors must</span> |
| <span class="sd"> ensure that these methods are *idempotent*, as they may be called multiple</span> |
| <span class="sd"> times on different machines in the case of failure/retry. A method may be</span> |
| <span class="sd"> called more than once concurrently, in which case it's okay to have a</span> |
| <span class="sd"> transient failure (such as due to a race condition). This failure should not</span> |
| <span class="sd"> prevent subsequent retries from succeeding.</span> |
| |
| <span class="sd"> ``initialize_write()`` should perform any initialization that needs to be done</span> |
| <span class="sd"> prior to writing to the sink. ``initialize_write()`` may return a result</span> |
| <span class="sd"> (let's call this ``init_result``) that contains any parameters it wants to</span> |
| <span class="sd"> pass on to its writers about the sink. For example, a sink that writes to a</span> |
| <span class="sd"> file system may return an ``init_result`` that contains a dynamically</span> |
| <span class="sd"> generated unique directory to which data should be written.</span> |
| |
| <span class="sd"> To perform writing of a bundle of elements, Dataflow execution engine will</span> |
| <span class="sd"> create an ``iobase.Writer`` using the implementation of</span> |
| <span class="sd"> ``iobase.Sink.open_writer()``. When invoking ``open_writer()`` execution</span> |
| <span class="sd"> engine will provide the ``init_result`` returned by ``initialize_write()``</span> |
| <span class="sd"> invocation as well as a *bundle id* (let's call this ``bundle_id``) that is</span> |
| <span class="sd"> unique for each invocation of ``open_writer()``.</span> |
| |
| <span class="sd"> Execution engine will then invoke ``iobase.Writer.write()`` implementation for</span> |
| <span class="sd"> each element that has to be written. Once all elements of a bundle are</span> |
| <span class="sd"> written, execution engine will invoke ``iobase.Writer.close()`` implementation</span> |
| <span class="sd"> which should return a result (let's call this ``write_result``) that contains</span> |
| <span class="sd"> information that encodes the result of the write and, in most cases, some</span> |
| <span class="sd"> encoding of the unique bundle id. For example, if each bundle is written to a</span> |
| <span class="sd"> unique temporary file, ``close()`` method may return an object that contains</span> |
| <span class="sd"> the temporary file name. After writing of all bundles is complete, execution</span> |
| <span class="sd"> engine will invoke ``pre_finalize()`` and then ``finalize_write()``</span> |
| <span class="sd"> implementation.</span> |
| |
| <span class="sd"> The execution of a write transform can be illustrated using following pseudo</span> |
| <span class="sd"> code (assume that the outer for loop happens in parallel across many</span> |
| <span class="sd"> machines)::</span> |
| |
| <span class="sd"> init_result = sink.initialize_write()</span> |
| <span class="sd"> write_results = []</span> |
| <span class="sd"> for bundle in partition(pcoll):</span> |
| <span class="sd"> writer = sink.open_writer(init_result, generate_bundle_id())</span> |
| <span class="sd"> for elem in bundle:</span> |
| <span class="sd"> writer.write(elem)</span> |
| <span class="sd"> write_results.append(writer.close())</span> |
| <span class="sd"> pre_finalize_result = sink.pre_finalize(init_result, write_results)</span> |
| <span class="sd"> sink.finalize_write(init_result, write_results, pre_finalize_result)</span> |
| |
| |
| <span class="sd"> **init_result**</span> |
| |
| <span class="sd"> Methods of 'iobase.Sink' should agree on the 'init_result' type that will be</span> |
| <span class="sd"> returned when initializing the sink. This type can be a client-defined object</span> |
| <span class="sd"> or an existing type. The returned type must be picklable using Dataflow coder</span> |
| <span class="sd"> ``coders.PickleCoder``. Returning an init_result is optional.</span> |
| |
| <span class="sd"> **bundle_id**</span> |
| |
| <span class="sd"> In order to ensure fault-tolerance, a bundle may be executed multiple times</span> |
| <span class="sd"> (e.g., in the event of failure/retry or for redundancy). However, exactly one</span> |
| <span class="sd"> of these executions will have its result passed to the</span> |
| <span class="sd"> ``iobase.Sink.finalize_write()`` method. Each call to</span> |
| <span class="sd"> ``iobase.Sink.open_writer()`` is passed a unique bundle id when it is called</span> |
| <span class="sd"> by the ``WriteImpl`` transform, so even redundant or retried bundles will have</span> |
| <span class="sd"> a unique way of identifying their output.</span> |
| |
| <span class="sd"> The bundle id should be used to guarantee that a bundle's output is unique.</span> |
| <span class="sd"> This uniqueness guarantee is important; if a bundle is to be output to a file,</span> |
| <span class="sd"> for example, the name of the file must be unique to avoid conflicts with other</span> |
| <span class="sd"> writers. The bundle id should be encoded in the writer result returned by the</span> |
| <span class="sd"> writer and subsequently used by the ``finalize_write()`` method to identify</span> |
| <span class="sd"> the results of successful writes.</span> |
| |
| <span class="sd"> For example, consider the scenario where a Writer writes files containing</span> |
| <span class="sd"> serialized records and the ``finalize_write()`` is to merge or rename these</span> |
| <span class="sd"> output files. In this case, a writer may use its unique id to name its output</span> |
| <span class="sd"> file (to avoid conflicts) and return the name of the file it wrote as its</span> |
| <span class="sd"> writer result. The ``finalize_write()`` will then receive an ``Iterable`` of</span> |
| <span class="sd"> output file names that it can then merge or rename using some bundle naming</span> |
| <span class="sd"> scheme.</span> |
| |
| <span class="sd"> **write_result**</span> |
| |
| <span class="sd"> ``iobase.Writer.close()`` and ``finalize_write()`` implementations must agree</span> |
| <span class="sd"> on type of the ``write_result`` object returned when invoking</span> |
| <span class="sd"> ``iobase.Writer.close()``. This type can be a client-defined object or</span> |
| <span class="sd"> an existing type. The returned type must be picklable using Dataflow coder</span> |
| <span class="sd"> ``coders.PickleCoder``. Returning a ``write_result`` when</span> |
| <span class="sd"> ``iobase.Writer.close()`` is invoked is optional but if unique</span> |
| <span class="sd"> ``write_result`` objects are not returned, sink should, guarantee idempotency</span> |
| <span class="sd"> when same bundle is written multiple times due to failure/retry or redundancy.</span> |
| |
| |
| <span class="sd"> **More information**</span> |
| |
| <span class="sd"> For more information on creating new sinks please refer to the official</span> |
| <span class="sd"> documentation at</span> |
| <span class="sd"> ``https://beam.apache.org/documentation/sdks/python-custom-io#creating-sinks``</span> |
| <span class="sd"> """</span> |
| <span class="c1"># Whether Beam should skip writing any shards if all are empty.</span> |
| <span class="n">skip_if_empty</span> <span class="o">=</span> <span class="kc">False</span> |
| |
| <div class="viewcode-block" id="Sink.initialize_write"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink.initialize_write">[docs]</a> <span class="k">def</span> <span class="nf">initialize_write</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Initializes the sink before writing begins.</span> |
| |
| <span class="sd"> Invoked before any data is written to the sink.</span> |
| |
| |
| <span class="sd"> Please see documentation in ``iobase.Sink`` for an example.</span> |
| |
| <span class="sd"> Returns:</span> |
| <span class="sd"> An object that contains any sink specific state generated by</span> |
| <span class="sd"> initialization. This object will be passed to open_writer() and</span> |
| <span class="sd"> finalize_write() methods.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="Sink.open_writer"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink.open_writer">[docs]</a> <span class="k">def</span> <span class="nf">open_writer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">init_result</span><span class="p">,</span> <span class="n">uid</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Opens a writer for writing a bundle of elements to the sink.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> init_result: the result of initialize_write() invocation.</span> |
| <span class="sd"> uid: a unique identifier generated by the system.</span> |
| <span class="sd"> Returns:</span> |
| <span class="sd"> an ``iobase.Writer`` that can be used to write a bundle of records to the</span> |
| <span class="sd"> current sink.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="Sink.pre_finalize"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink.pre_finalize">[docs]</a> <span class="k">def</span> <span class="nf">pre_finalize</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">init_result</span><span class="p">,</span> <span class="n">writer_results</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Pre-finalization stage for sink.</span> |
| |
| <span class="sd"> Called after all bundle writes are complete and before finalize_write.</span> |
| <span class="sd"> Used to setup and verify filesystem and sink states.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> init_result: the result of ``initialize_write()`` invocation.</span> |
| <span class="sd"> writer_results: an iterable containing results of ``Writer.close()``</span> |
| <span class="sd"> invocations. This will only contain results of successful writes, and</span> |
| <span class="sd"> will only contain the result of a single successful write for a given</span> |
| <span class="sd"> bundle.</span> |
| |
| <span class="sd"> Returns:</span> |
| <span class="sd"> An object that contains any sink specific state generated.</span> |
| <span class="sd"> This object will be passed to finalize_write().</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="Sink.finalize_write"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink.finalize_write">[docs]</a> <span class="k">def</span> <span class="nf">finalize_write</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">init_result</span><span class="p">,</span> <span class="n">writer_results</span><span class="p">,</span> <span class="n">pre_finalize_result</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Finalizes the sink after all data is written to it.</span> |
| |
| <span class="sd"> Given the result of initialization and an iterable of results from bundle</span> |
| <span class="sd"> writes, performs finalization after writing and closes the sink. Called</span> |
| <span class="sd"> after all bundle writes are complete.</span> |
| |
| <span class="sd"> The bundle write results that are passed to finalize are those returned by</span> |
| <span class="sd"> bundles that completed successfully. Although bundles may have been run</span> |
| <span class="sd"> multiple times (for fault-tolerance), only one writer result will be passed</span> |
| <span class="sd"> to finalize for each bundle. An implementation of finalize should perform</span> |
| <span class="sd"> clean up of any failed and successfully retried bundles. Note that these</span> |
| <span class="sd"> failed bundles will not have their writer result passed to finalize, so</span> |
| <span class="sd"> finalize should be capable of locating any temporary/partial output written</span> |
| <span class="sd"> by failed bundles.</span> |
| |
| <span class="sd"> If all retries of a bundle fails, the whole pipeline will fail *without*</span> |
| <span class="sd"> finalize_write() being invoked.</span> |
| |
| <span class="sd"> A best practice is to make finalize atomic. If this is impossible given the</span> |
| <span class="sd"> semantics of the sink, finalize should be idempotent, as it may be called</span> |
| <span class="sd"> multiple times in the case of failure/retry or for redundancy.</span> |
| |
| <span class="sd"> Note that the iteration order of the writer results is not guaranteed to be</span> |
| <span class="sd"> consistent if finalize is called multiple times.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> init_result: the result of ``initialize_write()`` invocation.</span> |
| <span class="sd"> writer_results: an iterable containing results of ``Writer.close()``</span> |
| <span class="sd"> invocations. This will only contain results of successful writes, and</span> |
| <span class="sd"> will only contain the result of a single successful write for a given</span> |
| <span class="sd"> bundle.</span> |
| <span class="sd"> pre_finalize_result: the result of ``pre_finalize()`` invocation.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div> |
| |
| |
| <div class="viewcode-block" id="Writer"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Writer">[docs]</a><span class="k">class</span> <span class="nc">Writer</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""This class is deprecated, no backwards-compatibility guarantees.</span> |
| |
| <span class="sd"> Writes a bundle of elements from a ``PCollection`` to a sink.</span> |
| |
| <span class="sd"> A Writer ``iobase.Writer.write()`` writes and elements to the sink while</span> |
| <span class="sd"> ``iobase.Writer.close()`` is called after all elements in the bundle have been</span> |
| <span class="sd"> written.</span> |
| |
| <span class="sd"> See ``iobase.Sink`` for more detailed documentation about the process of</span> |
| <span class="sd"> writing to a sink.</span> |
| <span class="sd"> """</span> |
| <div class="viewcode-block" id="Writer.write"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Writer.write">[docs]</a> <span class="k">def</span> <span class="nf">write</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Writes a value to the sink using the current writer.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="Writer.close"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Writer.close">[docs]</a> <span class="k">def</span> <span class="nf">close</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Closes the current writer.</span> |
| |
| <span class="sd"> Please see documentation in ``iobase.Sink`` for an example.</span> |
| |
| <span class="sd"> Returns:</span> |
| <span class="sd"> An object representing the writes that were performed by the current</span> |
| <span class="sd"> writer.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="Writer.at_capacity"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Writer.at_capacity">[docs]</a> <span class="k">def</span> <span class="nf">at_capacity</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="nb">bool</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""Returns whether this writer should be considered at capacity</span> |
| <span class="sd"> and a new one should be created.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="kc">False</span></div></div> |
| |
| |
| <div class="viewcode-block" id="Read"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read">[docs]</a><span class="k">class</span> <span class="nc">Read</span><span class="p">(</span><span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A transform that reads a PCollection."""</span> |
| <span class="c1"># Import runners here to prevent circular imports</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.pipeline_context</span> <span class="kn">import</span> <span class="n">PipelineContext</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source</span><span class="p">):</span> |
| <span class="c1"># type: (SourceBase) -> None</span> |
| |
| <span class="w"> </span><span class="sd">"""Initializes a Read transform.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> source: Data source to read from.</span> |
| <span class="sd"> """</span> |
| <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">source</span> <span class="o">=</span> <span class="n">source</span> |
| |
| <div class="viewcode-block" id="Read.get_desired_chunk_size"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.get_desired_chunk_size">[docs]</a> <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">get_desired_chunk_size</span><span class="p">(</span><span class="n">total_size</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">total_size</span><span class="p">:</span> |
| <span class="c1"># 1MB = 1 shard, 1GB = 32 shards, 1TB = 1000 shards, 1PB = 32k shards</span> |
| <span class="n">chunk_size</span> <span class="o">=</span> <span class="nb">max</span><span class="p">(</span><span class="mi">1</span> <span class="o"><<</span> <span class="mi">20</span><span class="p">,</span> <span class="mi">1000</span> <span class="o">*</span> <span class="nb">int</span><span class="p">(</span><span class="n">math</span><span class="o">.</span><span class="n">sqrt</span><span class="p">(</span><span class="n">total_size</span><span class="p">)))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">chunk_size</span> <span class="o">=</span> <span class="mi">64</span> <span class="o"><<</span> <span class="mi">20</span> <span class="c1"># 64mb</span> |
| <span class="k">return</span> <span class="n">chunk_size</span></div> |
| |
| <div class="viewcode-block" id="Read.expand"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pbegin</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">BoundedSource</span><span class="p">):</span> |
| <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">register_coder</span><span class="p">(</span><span class="n">BoundedSource</span><span class="p">,</span> <span class="n">_MemoizingPickleCoder</span><span class="p">)</span> |
| <span class="n">display_data</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">display_data</span><span class="p">()</span> <span class="ow">or</span> <span class="p">{}</span> |
| <span class="n">display_data</span><span class="p">[</span><span class="s1">'source'</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="vm">__class__</span> |
| |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="n">pbegin</span> |
| <span class="o">|</span> <span class="n">Impulse</span><span class="p">()</span> |
| <span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">)</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">BoundedSource</span><span class="p">)</span> |
| <span class="o">|</span> <span class="n">SDFBoundedSourceReader</span><span class="p">(</span><span class="n">display_data</span><span class="p">))</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="c1"># The Read transform can also admit a full PTransform as an input</span> |
| <span class="c1"># rather than an anctual source. If the input is a PTransform, then</span> |
| <span class="c1"># just apply it directly.</span> |
| <span class="k">return</span> <span class="n">pbegin</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># Treat Read itself as a primitive.</span> |
| <span class="k">return</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">(</span> |
| <span class="n">pbegin</span><span class="o">.</span><span class="n">pipeline</span><span class="p">,</span> <span class="n">is_bounded</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">is_bounded</span><span class="p">())</span></div> |
| |
| <div class="viewcode-block" id="Read.get_windowing"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.get_windowing">[docs]</a> <span class="k">def</span> <span class="nf">get_windowing</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_inputs</span><span class="p">):</span> |
| <span class="c1"># type: (...) -> core.Windowing</span> |
| <span class="k">return</span> <span class="n">core</span><span class="o">.</span><span class="n">Windowing</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span></div> |
| |
| <span class="k">def</span> <span class="nf">_infer_output_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">input_coder</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="c1"># type: (...) -> Optional[coders.Coder]</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.dataflow.native_io</span> <span class="kn">import</span> <span class="n">iobase</span> <span class="k">as</span> <span class="n">dataflow_io</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">BoundedSource</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">default_output_coder</span><span class="p">()</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">dataflow_io</span><span class="o">.</span><span class="n">NativeSource</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">coder</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">None</span> |
| |
| <div class="viewcode-block" id="Read.display_data"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="s1">'source'</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s1">'Read Source'</span><span class="p">),</span> |
| <span class="s1">'source_dd'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span> |
| <span class="p">}</span></div> |
| |
| <div class="viewcode-block" id="Read.to_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.to_runner_api_parameter">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">context</span><span class="p">:</span> <span class="n">PipelineContext</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.dataflow.native_io</span> <span class="kn">import</span> <span class="n">iobase</span> <span class="k">as</span> <span class="n">dataflow_io</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="p">(</span><span class="n">BoundedSource</span><span class="p">,</span> <span class="n">dataflow_io</span><span class="o">.</span><span class="n">NativeSource</span><span class="p">)):</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp.pubsub</span> <span class="kn">import</span> <span class="n">_PubSubSource</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">_PubSubSource</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">PUBSUB_READ</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> |
| <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubReadPayload</span><span class="p">(</span> |
| <span class="n">topic</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">full_topic</span><span class="p">,</span> |
| <span class="n">subscription</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">full_subscription</span><span class="p">,</span> |
| <span class="n">timestamp_attribute</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">timestamp_attribute</span><span class="p">,</span> |
| <span class="n">with_attributes</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">with_attributes</span><span class="p">,</span> |
| <span class="n">id_attribute</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">id_label</span><span class="p">))</span> |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="n">common_urns</span><span class="o">.</span><span class="n">deprecated_primitives</span><span class="o">.</span><span class="n">READ</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> |
| <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ReadPayload</span><span class="p">(</span> |
| <span class="n">source</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">(</span><span class="n">context</span><span class="p">),</span> |
| <span class="n">is_bounded</span><span class="o">=</span><span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">IsBounded</span><span class="o">.</span><span class="n">BOUNDED</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">is_bounded</span><span class="p">()</span> <span class="k">else</span> |
| <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">IsBounded</span><span class="o">.</span><span class="n">UNBOUNDED</span><span class="p">))</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">to_runner_api_parameter</span><span class="p">(</span><span class="n">context</span><span class="p">)</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span> |
| <span class="s2">"to_runner_api_parameter not "</span> |
| <span class="s2">"implemented for type"</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="Read.from_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.from_runner_api_parameter">[docs]</a> <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">from_runner_api_parameter</span><span class="p">(</span> |
| <span class="n">transform</span><span class="p">:</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PTransform</span><span class="p">,</span> |
| <span class="n">payload</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ReadPayload</span><span class="p">,</span> |
| <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubReadPayload</span><span class="p">],</span> |
| <span class="n">context</span><span class="p">:</span> <span class="n">PipelineContext</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="s2">"Read"</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">transform</span><span class="o">.</span><span class="n">spec</span><span class="o">.</span><span class="n">urn</span> <span class="o">==</span> <span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">PUBSUB_READ</span><span class="o">.</span><span class="n">urn</span><span class="p">:</span> |
| <span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">payload</span><span class="p">,</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubReadPayload</span><span class="p">)</span> |
| <span class="c1"># Importing locally to prevent circular dependencies.</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp.pubsub</span> <span class="kn">import</span> <span class="n">_PubSubSource</span> |
| <span class="n">source</span> <span class="o">=</span> <span class="n">_PubSubSource</span><span class="p">(</span> |
| <span class="n">topic</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">topic</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">subscription</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">subscription</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">id_label</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">id_attribute</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">with_attributes</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">with_attributes</span><span class="p">,</span> |
| <span class="n">timestamp_attribute</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">timestamp_attribute</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">Read</span><span class="p">(</span><span class="n">source</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">payload</span><span class="p">,</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ReadPayload</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">Read</span><span class="p">(</span><span class="n">SourceBase</span><span class="o">.</span><span class="n">from_runner_api</span><span class="p">(</span><span class="n">payload</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">context</span><span class="p">))</span></div> |
| |
| <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">_from_runner_api_parameter_read</span><span class="p">(</span> |
| <span class="n">transform</span><span class="p">:</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PTransform</span><span class="p">,</span> |
| <span class="n">payload</span><span class="p">:</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ReadPayload</span><span class="p">,</span> |
| <span class="n">context</span><span class="p">:</span> <span class="n">PipelineContext</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="s2">"Read"</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""Method for type proxying when calling register_urn due to limitations</span> |
| <span class="sd"> in type exprs in Python"""</span> |
| <span class="k">return</span> <span class="n">Read</span><span class="o">.</span><span class="n">from_runner_api_parameter</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="n">payload</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span> |
| |
| <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">_from_runner_api_parameter_pubsub_read</span><span class="p">(</span> |
| <span class="n">transform</span><span class="p">:</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PTransform</span><span class="p">,</span> |
| <span class="n">payload</span><span class="p">:</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubReadPayload</span><span class="p">,</span> |
| <span class="n">context</span><span class="p">:</span> <span class="n">PipelineContext</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="s2">"Read"</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""Method for type proxying when calling register_urn due to limitations</span> |
| <span class="sd"> in type exprs in Python"""</span> |
| <span class="k">return</span> <span class="n">Read</span><span class="o">.</span><span class="n">from_runner_api_parameter</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="n">payload</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span></div> |
| |
| |
| <span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span> |
| <span class="n">common_urns</span><span class="o">.</span><span class="n">deprecated_primitives</span><span class="o">.</span><span class="n">READ</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> |
| <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ReadPayload</span><span class="p">,</span> |
| <span class="n">Read</span><span class="o">.</span><span class="n">_from_runner_api_parameter_read</span><span class="p">,</span> |
| <span class="p">)</span> |
| |
| <span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span> |
| <span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">PUBSUB_READ</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> |
| <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubReadPayload</span><span class="p">,</span> |
| <span class="n">Read</span><span class="o">.</span><span class="n">_from_runner_api_parameter_pubsub_read</span><span class="p">,</span> |
| <span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="Write"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Write">[docs]</a><span class="k">class</span> <span class="nc">Write</span><span class="p">(</span><span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A ``PTransform`` that writes to a sink.</span> |
| |
| <span class="sd"> A sink should inherit ``iobase.Sink``. Such implementations are</span> |
| <span class="sd"> handled using a composite transform that consists of three ``ParDo``s -</span> |
| <span class="sd"> (1) a ``ParDo`` performing a global initialization (2) a ``ParDo`` performing</span> |
| <span class="sd"> a parallel write and (3) a ``ParDo`` performing a global finalization. In the</span> |
| <span class="sd"> case of an empty ``PCollection``, only the global initialization and</span> |
| <span class="sd"> finalization will be performed. Currently only batch workflows support custom</span> |
| <span class="sd"> sinks.</span> |
| |
| <span class="sd"> Example usage::</span> |
| |
| <span class="sd"> pcollection | beam.io.Write(MySink())</span> |
| |
| <span class="sd"> This returns a ``pvalue.PValue`` object that represents the end of the</span> |
| <span class="sd"> Pipeline.</span> |
| |
| <span class="sd"> The sink argument may also be a full PTransform, in which case it will be</span> |
| <span class="sd"> applied directly. This allows composite sink-like transforms (e.g. a sink</span> |
| <span class="sd"> with some pre-processing DoFns) to be used the same as all other sinks.</span> |
| |
| <span class="sd"> This transform also supports sinks that inherit ``iobase.NativeSink``. These</span> |
| <span class="sd"> are sinks that are implemented natively by the Dataflow service and hence</span> |
| <span class="sd"> should not be updated by users. These sinks are processed using a Dataflow</span> |
| <span class="sd"> native write transform.</span> |
| <span class="sd"> """</span> |
| <span class="c1"># Import runners here to prevent circular imports</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.pipeline_context</span> <span class="kn">import</span> <span class="n">PipelineContext</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sink</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Initializes a Write transform.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> sink: Data sink to write to.</span> |
| <span class="sd"> """</span> |
| <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sink</span> <span class="o">=</span> <span class="n">sink</span> |
| |
| <div class="viewcode-block" id="Write.display_data"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Write.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span><span class="s1">'sink'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span> <span class="s1">'sink_dd'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">}</span></div> |
| |
| <div class="viewcode-block" id="Write.expand"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Write.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.dataflow.native_io</span> <span class="kn">import</span> <span class="n">iobase</span> <span class="k">as</span> <span class="n">dataflow_io</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="n">dataflow_io</span><span class="o">.</span><span class="n">NativeSink</span><span class="p">):</span> |
| <span class="c1"># A native sink</span> |
| <span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="s1">'NativeWrite'</span> <span class="o">>></span> <span class="n">dataflow_io</span><span class="o">.</span><span class="n">_NativeWrite</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="n">Sink</span><span class="p">):</span> |
| <span class="c1"># A custom sink</span> |
| <span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">WriteImpl</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="c1"># This allows "composite" sinks to be used like non-composite ones.</span> |
| <span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'A sink must inherit iobase.Sink, iobase.NativeSink, '</span> |
| <span class="s1">'or be a PTransform. Received : </span><span class="si">%r</span><span class="s1">'</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="Write.to_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Write.to_runner_api_parameter">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">context</span><span class="p">:</span> <span class="n">PipelineContext</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span> |
| <span class="c1"># Importing locally to prevent circular dependencies.</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp.pubsub</span> <span class="kn">import</span> <span class="n">_PubSubSink</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="n">_PubSubSink</span><span class="p">):</span> |
| <span class="n">payload</span> <span class="o">=</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubWritePayload</span><span class="p">(</span> |
| <span class="n">topic</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">full_topic</span><span class="p">,</span> |
| <span class="n">id_attribute</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">id_label</span><span class="p">,</span> |
| <span class="n">timestamp_attribute</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">timestamp_attribute</span><span class="p">)</span> |
| <span class="k">return</span> <span class="p">(</span><span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">PUBSUB_WRITE</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> <span class="n">payload</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="n">to_runner_api_parameter</span><span class="p">(</span><span class="n">context</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="Write.from_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Write.from_runner_api_parameter">[docs]</a> <span class="nd">@staticmethod</span> |
| <span class="nd">@ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span> |
| <span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">PUBSUB_WRITE</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> |
| <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubWritePayload</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">from_runner_api_parameter</span><span class="p">(</span> |
| <span class="n">ptransform</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> |
| <span class="n">payload</span><span class="p">:</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubWritePayload</span><span class="p">,</span> |
| <span class="n">unused_context</span><span class="p">:</span> <span class="n">PipelineContext</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="s2">"Write"</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">spec</span><span class="o">.</span><span class="n">urn</span> <span class="o">!=</span> <span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">PUBSUB_WRITE</span><span class="o">.</span><span class="n">urn</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'Write transform cannot be constructed for the given proto </span><span class="si">%r</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">ptransform</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">payload</span><span class="o">.</span><span class="n">topic</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span> |
| <span class="s2">"from_runner_api_parameter does not "</span> |
| <span class="s2">"handle empty or None topic"</span><span class="p">)</span> |
| |
| <span class="c1"># Importing locally to prevent circular dependencies.</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp.pubsub</span> <span class="kn">import</span> <span class="n">_PubSubSink</span> |
| <span class="n">sink</span> <span class="o">=</span> <span class="n">_PubSubSink</span><span class="p">(</span> |
| <span class="n">topic</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">topic</span><span class="p">,</span> |
| <span class="n">id_label</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">id_attribute</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">timestamp_attribute</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">timestamp_attribute</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">Write</span><span class="p">(</span><span class="n">sink</span><span class="p">)</span></div></div> |
| |
| |
| <span class="k">class</span> <span class="nc">WriteImpl</span><span class="p">(</span><span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Implements the writing of custom sinks."""</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sink</span><span class="p">):</span> |
| <span class="c1"># type: (Sink) -> None</span> |
| <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sink</span> <span class="o">=</span> <span class="n">sink</span> |
| |
| <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="n">do_once</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">|</span> <span class="s1">'DoOnce'</span> <span class="o">>></span> <span class="n">core</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span> |
| <span class="n">init_result_coll</span> <span class="o">=</span> <span class="n">do_once</span> <span class="o">|</span> <span class="s1">'InitializeWrite'</span> <span class="o">>></span> <span class="n">core</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span> |
| <span class="k">lambda</span> <span class="n">_</span><span class="p">,</span> <span class="n">sink</span><span class="p">:</span> <span class="n">sink</span><span class="o">.</span><span class="n">initialize_write</span><span class="p">(),</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">)</span> |
| <span class="k">if</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="s1">'num_shards'</span><span class="p">,</span> <span class="mi">0</span><span class="p">):</span> |
| <span class="n">min_shards</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">num_shards</span> |
| <span class="k">if</span> <span class="n">min_shards</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span> |
| <span class="n">keyed_pcoll</span> <span class="o">=</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">x</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">keyed_pcoll</span> <span class="o">=</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">_RoundRobinKeyFn</span><span class="p">(),</span> <span class="n">count</span><span class="o">=</span><span class="n">min_shards</span><span class="p">)</span> |
| <span class="n">write_result_coll</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">keyed_pcoll</span> |
| <span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span> |
| <span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span> |
| <span class="o">|</span> <span class="s1">'WriteBundles'</span> <span class="o">>></span> <span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">_WriteKeyedBundleDoFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">),</span> <span class="n">AsSingleton</span><span class="p">(</span><span class="n">init_result_coll</span><span class="p">)))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">min_shards</span> <span class="o">=</span> <span class="mi">1</span> |
| <span class="n">write_result_coll</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">pcoll</span> |
| <span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span> |
| <span class="o">|</span> <span class="s1">'WriteBundles'</span> <span class="o">>></span> <span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">_WriteBundleDoFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">),</span> <span class="n">AsSingleton</span><span class="p">(</span><span class="n">init_result_coll</span><span class="p">))</span> |
| <span class="o">|</span> <span class="s1">'Pair'</span> <span class="o">>></span> <span class="n">core</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">x</span><span class="p">))</span> |
| <span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span> |
| <span class="o">|</span> <span class="s1">'Extract'</span> <span class="o">>></span> <span class="n">core</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">]))</span> |
| <span class="c1"># PreFinalize should run before FinalizeWrite, and the two should not be</span> |
| <span class="c1"># fused.</span> |
| <span class="n">pre_finalize_coll</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">do_once</span> |
| <span class="o">|</span> <span class="s1">'PreFinalize'</span> <span class="o">>></span> <span class="n">core</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span> |
| <span class="n">_pre_finalize</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> |
| <span class="n">AsSingleton</span><span class="p">(</span><span class="n">init_result_coll</span><span class="p">),</span> |
| <span class="n">AsIter</span><span class="p">(</span><span class="n">write_result_coll</span><span class="p">)))</span> |
| <span class="k">return</span> <span class="n">do_once</span> <span class="o">|</span> <span class="s1">'FinalizeWrite'</span> <span class="o">>></span> <span class="n">core</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span> |
| <span class="n">_finalize_write</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> |
| <span class="n">AsSingleton</span><span class="p">(</span><span class="n">init_result_coll</span><span class="p">),</span> |
| <span class="n">AsIter</span><span class="p">(</span><span class="n">write_result_coll</span><span class="p">),</span> |
| <span class="n">min_shards</span><span class="p">,</span> |
| <span class="n">AsSingleton</span><span class="p">(</span><span class="n">pre_finalize_coll</span><span class="p">))</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="nb">str</span><span class="p">)</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_WriteBundleDoFn</span><span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A DoFn for writing elements to an iobase.Writer.</span> |
| <span class="sd"> Opens a writer at the first element and closes the writer at finish_bundle().</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sink</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sink</span> <span class="o">=</span> <span class="n">sink</span> |
| |
| <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span><span class="s1">'sink_dd'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">}</span> |
| |
| <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">init_result</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="c1"># We ignore UUID collisions here since they are extremely rare.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">open_writer</span><span class="p">(</span><span class="n">init_result</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">()))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">element</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="o">.</span><span class="n">at_capacity</span><span class="p">():</span> |
| <span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">WindowedValue</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">(),</span> |
| <span class="n">window</span><span class="o">.</span><span class="n">GlobalWindow</span><span class="p">()</span><span class="o">.</span><span class="n">max_timestamp</span><span class="p">(),</span> <span class="p">[</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindow</span><span class="p">()])</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_WriteKeyedBundleDoFn</span><span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sink</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sink</span> <span class="o">=</span> <span class="n">sink</span> |
| |
| <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span><span class="s1">'sink_dd'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">}</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">init_result</span><span class="p">):</span> |
| <span class="n">bundle</span> <span class="o">=</span> <span class="n">element</span> |
| <span class="n">writer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">open_writer</span><span class="p">(</span><span class="n">init_result</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">()))</span> |
| <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">bundle</span><span class="p">[</span><span class="mi">1</span><span class="p">]:</span> <span class="c1"># values</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> |
| <span class="k">return</span> <span class="p">[</span><span class="n">window</span><span class="o">.</span><span class="n">TimestampedValue</span><span class="p">(</span><span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">(),</span> <span class="n">timestamp</span><span class="o">.</span><span class="n">MAX_TIMESTAMP</span><span class="p">)]</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_pre_finalize</span><span class="p">(</span><span class="n">unused_element</span><span class="p">,</span> <span class="n">sink</span><span class="p">,</span> <span class="n">init_result</span><span class="p">,</span> <span class="n">write_results</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">sink</span><span class="o">.</span><span class="n">pre_finalize</span><span class="p">(</span><span class="n">init_result</span><span class="p">,</span> <span class="n">write_results</span><span class="p">)</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_finalize_write</span><span class="p">(</span> |
| <span class="n">unused_element</span><span class="p">,</span> |
| <span class="n">sink</span><span class="p">,</span> |
| <span class="n">init_result</span><span class="p">,</span> |
| <span class="n">write_results</span><span class="p">,</span> |
| <span class="n">min_shards</span><span class="p">,</span> |
| <span class="n">pre_finalize_results</span><span class="p">):</span> |
| <span class="n">write_results</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">write_results</span><span class="p">)</span> |
| <span class="n">extra_shards</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">write_results</span><span class="p">)</span> <span class="o"><</span> <span class="n">min_shards</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">write_results</span> <span class="ow">or</span> <span class="ow">not</span> <span class="n">sink</span><span class="o">.</span><span class="n">skip_if_empty</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span> |
| <span class="s1">'Creating </span><span class="si">%s</span><span class="s1"> empty shard(s).'</span><span class="p">,</span> <span class="n">min_shards</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="n">write_results</span><span class="p">))</span> |
| <span class="k">for</span> <span class="n">_</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">min_shards</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="n">write_results</span><span class="p">)):</span> |
| <span class="n">writer</span> <span class="o">=</span> <span class="n">sink</span><span class="o">.</span><span class="n">open_writer</span><span class="p">(</span><span class="n">init_result</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">()))</span> |
| <span class="n">extra_shards</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">())</span> |
| <span class="n">outputs</span> <span class="o">=</span> <span class="n">sink</span><span class="o">.</span><span class="n">finalize_write</span><span class="p">(</span> |
| <span class="n">init_result</span><span class="p">,</span> <span class="n">write_results</span> <span class="o">+</span> <span class="n">extra_shards</span><span class="p">,</span> <span class="n">pre_finalize_results</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">outputs</span><span class="p">:</span> |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="n">window</span><span class="o">.</span><span class="n">TimestampedValue</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">.</span><span class="n">MAX_TIMESTAMP</span><span class="p">)</span> <span class="k">for</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">outputs</span><span class="p">)</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_RoundRobinKeyFn</span><span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">counter</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">count</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">counter</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">counter</span> <span class="o">=</span> <span class="n">random</span><span class="o">.</span><span class="n">randrange</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="n">count</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">counter</span> <span class="o">=</span> <span class="p">(</span><span class="mi">1</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">counter</span><span class="p">)</span> <span class="o">%</span> <span class="n">count</span> |
| <span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">counter</span><span class="p">,</span> <span class="n">element</span> |
| |
| |
| <div class="viewcode-block" id="RestrictionTracker"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker">[docs]</a><span class="k">class</span> <span class="nc">RestrictionTracker</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Manages access to a restriction.</span> |
| |
| <span class="sd"> Keeps track of the restrictions claimed part for a Splittable DoFn.</span> |
| |
| <span class="sd"> The restriction may be modified by different threads, however the system will</span> |
| <span class="sd"> ensure sufficient locking such that no methods on the restriction tracker</span> |
| <span class="sd"> will be called concurrently.</span> |
| |
| <span class="sd"> See following documents for more details.</span> |
| <span class="sd"> * https://s.apache.org/splittable-do-fn</span> |
| <span class="sd"> * https://s.apache.org/splittable-do-fn-python-sdk</span> |
| <span class="sd"> """</span> |
| <div class="viewcode-block" id="RestrictionTracker.current_restriction"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.current_restriction">[docs]</a> <span class="k">def</span> <span class="nf">current_restriction</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Returns the current restriction.</span> |
| |
| <span class="sd"> Returns a restriction accurately describing the full range of work the</span> |
| <span class="sd"> current ``DoFn.process()`` call will do, including already completed work.</span> |
| |
| <span class="sd"> The current restriction returned by method may be updated dynamically due</span> |
| <span class="sd"> to due to concurrent invocation of other methods of the</span> |
| <span class="sd"> ``RestrictionTracker``, For example, ``split()``.</span> |
| |
| <span class="sd"> This API is required to be implemented.</span> |
| |
| <span class="sd"> Returns: a restriction object.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="RestrictionTracker.current_progress"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.current_progress">[docs]</a> <span class="k">def</span> <span class="nf">current_progress</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> RestrictionProgress</span> |
| |
| <span class="w"> </span><span class="sd">"""Returns a RestrictionProgress object representing the current progress.</span> |
| |
| <span class="sd"> This API is recommended to be implemented. The runner can do a better job</span> |
| <span class="sd"> at parallel processing with better progress signals.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="RestrictionTracker.check_done"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.check_done">[docs]</a> <span class="k">def</span> <span class="nf">check_done</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Checks whether the restriction has been fully processed.</span> |
| |
| <span class="sd"> Called by the SDK harness after iterator returned by ``DoFn.process()``</span> |
| <span class="sd"> has been fully read.</span> |
| |
| <span class="sd"> This method must raise a `ValueError` if there is still any unclaimed work</span> |
| <span class="sd"> remaining in the restriction when this method is invoked. Exception raised</span> |
| <span class="sd"> must have an informative error message.</span> |
| |
| <span class="sd"> This API is required to be implemented in order to make sure no data loss</span> |
| <span class="sd"> during SDK processing.</span> |
| |
| <span class="sd"> Returns: ``True`` if current restriction has been fully processed.</span> |
| <span class="sd"> Raises:</span> |
| <span class="sd"> ValueError: if there is still any unclaimed work remaining.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="RestrictionTracker.try_split"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.try_split">[docs]</a> <span class="k">def</span> <span class="nf">try_split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fraction_of_remainder</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Splits current restriction based on fraction_of_remainder.</span> |
| |
| <span class="sd"> If splitting the current restriction is possible, the current restriction is</span> |
| <span class="sd"> split into a primary and residual restriction pair. This invocation updates</span> |
| <span class="sd"> the ``current_restriction()`` to be the primary restriction effectively</span> |
| <span class="sd"> having the current ``DoFn.process()`` execution responsible for performing</span> |
| <span class="sd"> the work that the primary restriction represents. The residual restriction</span> |
| <span class="sd"> will be executed in a separate ``DoFn.process()`` invocation (likely in a</span> |
| <span class="sd"> different process). The work performed by executing the primary and residual</span> |
| <span class="sd"> restrictions as separate ``DoFn.process()`` invocations MUST be equivalent</span> |
| <span class="sd"> to the work performed as if this split never occurred.</span> |
| |
| <span class="sd"> The ``fraction_of_remainder`` should be used in a best effort manner to</span> |
| <span class="sd"> choose a primary and residual restriction based upon the fraction of the</span> |
| <span class="sd"> remaining work that the current ``DoFn.process()`` invocation is responsible</span> |
| <span class="sd"> for. For example, if a ``DoFn.process()`` was reading a file with a</span> |
| <span class="sd"> restriction representing the offset range [100, 200) and has processed up to</span> |
| <span class="sd"> offset 130 with a fraction_of_remainder of 0.7, the primary and residual</span> |
| <span class="sd"> restrictions returned would be [100, 179), [179, 200) (note: current_offset</span> |
| <span class="sd"> + fraction_of_remainder * remaining_work = 130 + 0.7 * 70 = 179).</span> |
| |
| <span class="sd"> ``fraction_of_remainder`` = 0 means a checkpoint is required.</span> |
| |
| <span class="sd"> The API is recommended to be implemented for batch pipeline given that it is</span> |
| <span class="sd"> very important for pipeline scaling and end to end pipeline execution.</span> |
| |
| <span class="sd"> The API is required to be implemented for a streaming pipeline.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> fraction_of_remainder: A hint as to the fraction of work the primary</span> |
| <span class="sd"> restriction should represent based upon the current known remaining</span> |
| <span class="sd"> amount of work.</span> |
| |
| <span class="sd"> Returns:</span> |
| <span class="sd"> (primary_restriction, residual_restriction) if a split was possible,</span> |
| <span class="sd"> otherwise returns ``None``.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="RestrictionTracker.try_claim"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.try_claim">[docs]</a> <span class="k">def</span> <span class="nf">try_claim</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">position</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Attempts to claim the block of work in the current restriction</span> |
| <span class="sd"> identified by the given position. Each claimed position MUST be a valid</span> |
| <span class="sd"> split point.</span> |
| |
| <span class="sd"> If this succeeds, the DoFn MUST execute the entire block of work. If it</span> |
| <span class="sd"> fails, the ``DoFn.process()`` MUST return ``None`` without performing any</span> |
| <span class="sd"> additional work or emitting output (note that emitting output or performing</span> |
| <span class="sd"> work from ``DoFn.process()`` is also not allowed before the first call of</span> |
| <span class="sd"> this method).</span> |
| |
| <span class="sd"> The API is required to be implemented.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> position: current position that wants to be claimed.</span> |
| |
| <span class="sd"> Returns: ``True`` if the position can be claimed as current_position.</span> |
| <span class="sd"> Otherwise, returns ``False``.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="RestrictionTracker.is_bounded"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.is_bounded">[docs]</a> <span class="k">def</span> <span class="nf">is_bounded</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Returns whether the amount of work represented by the current restriction</span> |
| <span class="sd"> is bounded.</span> |
| |
| <span class="sd"> The boundedness of the restriction is used to determine the default behavior</span> |
| <span class="sd"> of how to truncate restrictions when a pipeline is being</span> |
| <span class="sd"> `drained <https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#>`_. # pylint: disable=line-too-long</span> |
| <span class="sd"> If the restriction is bounded, then the entire restriction will be processed</span> |
| <span class="sd"> otherwise the restriction will be processed till a checkpoint is possible.</span> |
| |
| <span class="sd"> The API is required to be implemented.</span> |
| |
| <span class="sd"> Returns: ``True`` if the restriction represents a finite amount of work.</span> |
| <span class="sd"> Otherwise, returns ``False``.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div> |
| |
| |
| <div class="viewcode-block" id="WatermarkEstimator"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.WatermarkEstimator">[docs]</a><span class="k">class</span> <span class="nc">WatermarkEstimator</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A WatermarkEstimator which is used for estimating output_watermark based on</span> |
| <span class="sd"> the timestamp of output records or manual modifications. Please refer to</span> |
| <span class="sd"> ``watermark_estiamtors`` for commonly used watermark estimators.</span> |
| |
| <span class="sd"> The base class provides common APIs that are called by the framework, which</span> |
| <span class="sd"> are also accessible inside a DoFn.process() body. Derived watermark estimator</span> |
| <span class="sd"> should implement all APIs listed below. Additional methods can be implemented</span> |
| <span class="sd"> and will be available when invoked within a DoFn.</span> |
| |
| <span class="sd"> Internal state must not be updated asynchronously.</span> |
| <span class="sd"> """</span> |
| <div class="viewcode-block" id="WatermarkEstimator.get_estimator_state"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.WatermarkEstimator.get_estimator_state">[docs]</a> <span class="k">def</span> <span class="nf">get_estimator_state</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Get current state of the WatermarkEstimator instance, which can be used</span> |
| <span class="sd"> to recreate the WatermarkEstimator when processing the restriction. See</span> |
| <span class="sd"> WatermarkEstimatorProvider.create_watermark_estimator.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="WatermarkEstimator.current_watermark"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.WatermarkEstimator.current_watermark">[docs]</a> <span class="k">def</span> <span class="nf">current_watermark</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> timestamp.Timestamp</span> |
| |
| <span class="w"> </span><span class="sd">"""Return estimated output_watermark. This function must return</span> |
| <span class="sd"> monotonically increasing watermarks."""</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="WatermarkEstimator.observe_timestamp"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.WatermarkEstimator.observe_timestamp">[docs]</a> <span class="k">def</span> <span class="nf">observe_timestamp</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">):</span> |
| <span class="c1"># type: (timestamp.Timestamp) -> None</span> |
| |
| <span class="w"> </span><span class="sd">"""Update tracking watermark with latest output timestamp.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> timestamp: the `timestamp.Timestamp` of current output element.</span> |
| |
| <span class="sd"> This is called with the timestamp of every element output from the DoFn.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div></div> |
| |
| |
| <div class="viewcode-block" id="RestrictionProgress"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionProgress">[docs]</a><span class="k">class</span> <span class="nc">RestrictionProgress</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Used to record the progress of a restriction."""</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="c1"># Only accept keyword arguments.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">'fraction'</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_completed</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">'completed'</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">'remaining'</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> |
| <span class="k">assert</span> <span class="ow">not</span> <span class="n">kwargs</span> |
| |
| <span class="k">def</span> <span class="fm">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="s1">'RestrictionProgress(fraction=</span><span class="si">%s</span><span class="s1">, completed=</span><span class="si">%s</span><span class="s1">, remaining=</span><span class="si">%s</span><span class="s1">)'</span> <span class="o">%</span> <span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_completed</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span><span class="p">)</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">completed_work</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> float</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_completed</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_completed</span> |
| <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span> <span class="o">*</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> <span class="o">/</span> <span class="p">(</span><span class="mi">1</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">remaining_work</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> float</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span> |
| <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_completed</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_completed</span> <span class="o">*</span> <span class="p">(</span><span class="mi">1</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span><span class="p">)</span> <span class="o">/</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="mi">1</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">total_work</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> float</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">completed_work</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">remaining_work</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">fraction_completed</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> float</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="nb">float</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_completed</span><span class="p">)</span> <span class="o">/</span> <span class="bp">self</span><span class="o">.</span><span class="n">total_work</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">fraction_remaining</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> float</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="mi">1</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="nb">float</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span><span class="p">)</span> <span class="o">/</span> <span class="bp">self</span><span class="o">.</span><span class="n">total_work</span> |
| |
| <div class="viewcode-block" id="RestrictionProgress.with_completed"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionProgress.with_completed">[docs]</a> <span class="k">def</span> <span class="nf">with_completed</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">completed</span><span class="p">):</span> |
| <span class="c1"># type: (int) -> RestrictionProgress</span> |
| <span class="k">return</span> <span class="n">RestrictionProgress</span><span class="p">(</span> |
| <span class="n">fraction</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span><span class="p">,</span> <span class="n">remaining</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span><span class="p">,</span> <span class="n">completed</span><span class="o">=</span><span class="n">completed</span><span class="p">)</span></div></div> |
| |
| |
| <span class="k">class</span> <span class="nc">_SDFBoundedSourceRestriction</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">""" A restriction wraps SourceBundle and RangeTracker. """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source_bundle</span><span class="p">,</span> <span class="n">range_tracker</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span> <span class="o">=</span> <span class="n">source_bundle</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_range_tracker</span> <span class="o">=</span> <span class="n">range_tracker</span> |
| |
| <span class="k">def</span> <span class="nf">__reduce__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># The instance of RangeTracker shouldn't be serialized.</span> |
| <span class="k">return</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="p">,</span> <span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">range_tracker</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_range_tracker</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_range_tracker</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">get_range_tracker</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">start_position</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">stop_position</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_range_tracker</span> |
| |
| <span class="k">def</span> <span class="nf">weight</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">weight</span> |
| |
| <span class="k">def</span> <span class="nf">source</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">source</span> |
| |
| <span class="k">def</span> <span class="nf">try_split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fraction_of_remainder</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">consumed_fraction</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">fraction_consumed</span><span class="p">()</span> |
| <span class="n">fraction</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">consumed_fraction</span> <span class="o">+</span> <span class="p">(</span><span class="mi">1</span> <span class="o">-</span> <span class="n">consumed_fraction</span><span class="p">)</span> <span class="o">*</span> <span class="n">fraction_of_remainder</span><span class="p">)</span> |
| <span class="n">position</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">position_at_fraction</span><span class="p">(</span><span class="n">fraction</span><span class="p">)</span> |
| <span class="c1"># Need to stash current stop_pos before splitting since</span> |
| <span class="c1"># range_tracker.split will update its stop_pos if splits</span> |
| <span class="c1"># successfully.</span> |
| <span class="n">stop_pos</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">stop_position</span> |
| <span class="n">split_result</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">try_split</span><span class="p">(</span><span class="n">position</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">split_result</span><span class="p">:</span> |
| <span class="n">split_pos</span><span class="p">,</span> <span class="n">split_fraction</span> <span class="o">=</span> <span class="n">split_result</span> |
| <span class="n">primary_weight</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">weight</span> <span class="o">*</span> <span class="n">split_fraction</span> |
| <span class="n">residual_weight</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">weight</span> <span class="o">-</span> <span class="n">primary_weight</span> |
| <span class="c1"># Update self to primary weight and end position.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span> <span class="o">=</span> <span class="n">SourceBundle</span><span class="p">(</span> |
| <span class="n">primary_weight</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">start_position</span><span class="p">,</span> |
| <span class="n">split_pos</span><span class="p">)</span> |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">_SDFBoundedSourceRestriction</span><span class="p">(</span> |
| <span class="n">SourceBundle</span><span class="p">(</span> |
| <span class="n">residual_weight</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> |
| <span class="n">split_pos</span><span class="p">,</span> |
| <span class="n">stop_pos</span><span class="p">)))</span> |
| <span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span> |
| <span class="c1"># For any exceptions from underlying trySplit calls, the wrapper will</span> |
| <span class="c1"># think that the source refuses to split at this point. In this case,</span> |
| <span class="c1"># no split happens at the wrapper level.</span> |
| <span class="k">return</span> <span class="kc">None</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_SDFBoundedSourceRestrictionTracker</span><span class="p">(</span><span class="n">RestrictionTracker</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""An `iobase.RestrictionTracker` implementations for wrapping BoundedSource</span> |
| <span class="sd"> with SDF. The tracked restriction is a _SDFBoundedSourceRestriction, which</span> |
| <span class="sd"> wraps SourceBundle and RangeTracker.</span> |
| |
| <span class="sd"> Delegated RangeTracker guarantees synchronization safety.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">restriction</span><span class="p">,</span> <span class="n">_SDFBoundedSourceRestriction</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'Initializing SDFBoundedSourceRestrictionTracker'</span> |
| <span class="s1">' requires a _SDFBoundedSourceRestriction. Got </span><span class="si">%s</span><span class="s1"> instead.'</span> <span class="o">%</span> |
| <span class="n">restriction</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span> <span class="o">=</span> <span class="n">restriction</span> |
| |
| <span class="k">def</span> <span class="nf">current_progress</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> RestrictionProgress</span> |
| <span class="k">return</span> <span class="n">RestrictionProgress</span><span class="p">(</span> |
| <span class="n">fraction</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">fraction_consumed</span><span class="p">())</span> |
| |
| <span class="k">def</span> <span class="nf">current_restriction</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span> |
| |
| <span class="k">def</span> <span class="nf">start_pos</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">start_position</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">stop_pos</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">stop_position</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">try_claim</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">position</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">try_claim</span><span class="p">(</span><span class="n">position</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">try_split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fraction_of_remainder</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">try_split</span><span class="p">(</span><span class="n">fraction_of_remainder</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">check_done</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">fraction_consumed</span><span class="p">()</span> <span class="o">>=</span> <span class="mf">1.0</span> |
| |
| <span class="k">def</span> <span class="nf">is_bounded</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="kc">True</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_SDFBoundedSourceWrapperRestrictionCoder</span><span class="p">(</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">decode</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">_SDFBoundedSourceRestriction</span><span class="p">(</span><span class="n">SourceBundle</span><span class="p">(</span><span class="o">*</span><span class="n">pickler</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">value</span><span class="p">)))</span> |
| |
| <span class="k">def</span> <span class="nf">encode</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">pickler</span><span class="o">.</span><span class="n">dumps</span><span class="p">((</span> |
| <span class="n">restriction</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">weight</span><span class="p">,</span> |
| <span class="n">restriction</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> |
| <span class="n">restriction</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">start_position</span><span class="p">,</span> |
| <span class="n">restriction</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">stop_position</span><span class="p">))</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_SDFBoundedSourceRestrictionProvider</span><span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">RestrictionProvider</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> A `RestrictionProvider` that is used by SDF for `BoundedSource`.</span> |
| |
| <span class="sd"> This restriction provider initializes restriction based on input</span> |
| <span class="sd"> element that is expected to be of BoundedSource type.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">desired_chunk_size</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">restriction_coder</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_desired_chunk_size</span> <span class="o">=</span> <span class="n">desired_chunk_size</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_restriction_coder</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">restriction_coder</span> <span class="ow">or</span> <span class="n">_SDFBoundedSourceWrapperRestrictionCoder</span><span class="p">())</span> |
| |
| <span class="k">def</span> <span class="nf">_check_source</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">src</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">src</span><span class="p">,</span> <span class="n">BoundedSource</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span> |
| <span class="s1">'SDFBoundedSourceRestrictionProvider can only utilize BoundedSource'</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">initial_restriction</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element_source</span><span class="p">:</span> <span class="n">BoundedSource</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_check_source</span><span class="p">(</span><span class="n">element_source</span><span class="p">)</span> |
| <span class="n">range_tracker</span> <span class="o">=</span> <span class="n">element_source</span><span class="o">.</span><span class="n">get_range_tracker</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">_SDFBoundedSourceRestriction</span><span class="p">(</span> |
| <span class="n">SourceBundle</span><span class="p">(</span> |
| <span class="kc">None</span><span class="p">,</span> |
| <span class="n">element_source</span><span class="p">,</span> |
| <span class="n">range_tracker</span><span class="o">.</span><span class="n">start_position</span><span class="p">(),</span> |
| <span class="n">range_tracker</span><span class="o">.</span><span class="n">stop_position</span><span class="p">()))</span> |
| |
| <span class="k">def</span> <span class="nf">create_tracker</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">_SDFBoundedSourceRestrictionTracker</span><span class="p">(</span><span class="n">restriction</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_desired_chunk_size</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">estimated_size</span> <span class="o">=</span> <span class="n">restriction</span><span class="o">.</span><span class="n">source</span><span class="p">()</span><span class="o">.</span><span class="n">estimate_size</span><span class="p">()</span> |
| <span class="k">except</span> <span class="ne">NotImplementedError</span><span class="p">:</span> |
| <span class="n">estimated_size</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_desired_chunk_size</span> <span class="o">=</span> <span class="n">Read</span><span class="o">.</span><span class="n">get_desired_chunk_size</span><span class="p">(</span><span class="n">estimated_size</span><span class="p">)</span> |
| |
| <span class="c1"># Invoke source.split to get initial splitting results.</span> |
| <span class="n">source_bundles</span> <span class="o">=</span> <span class="n">restriction</span><span class="o">.</span><span class="n">source</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_desired_chunk_size</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">source_bundle</span> <span class="ow">in</span> <span class="n">source_bundles</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">_SDFBoundedSourceRestriction</span><span class="p">(</span><span class="n">source_bundle</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">restriction_size</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">restriction</span><span class="o">.</span><span class="n">weight</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">restriction_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_restriction_coder</span> |
| |
| |
| <span class="k">class</span> <span class="nc">SDFBoundedSourceReader</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A ``PTransform`` that uses SDF to read from each ``BoundedSource`` in a</span> |
| <span class="sd"> PCollection.</span> |
| |
| <span class="sd"> NOTE: This transform can only be used with beam_fn_api enabled.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">data_to_display</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_data_to_display</span> <span class="o">=</span> <span class="n">data_to_display</span> <span class="ow">or</span> <span class="p">{}</span> |
| <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">_create_sdf_bounded_source_dofn</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">class</span> <span class="nc">SDFBoundedSourceDoFn</span><span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dd</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_dd</span> <span class="o">=</span> <span class="n">dd</span> |
| |
| <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dd</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">unused_element</span><span class="p">,</span> |
| <span class="n">restriction_tracker</span><span class="o">=</span><span class="n">core</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">RestrictionParam</span><span class="p">(</span> |
| <span class="n">_SDFBoundedSourceRestrictionProvider</span><span class="p">())):</span> |
| <span class="n">current_restriction</span> <span class="o">=</span> <span class="n">restriction_tracker</span><span class="o">.</span><span class="n">current_restriction</span><span class="p">()</span> |
| <span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">current_restriction</span><span class="p">,</span> <span class="n">_SDFBoundedSourceRestriction</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">current_restriction</span><span class="o">.</span><span class="n">source</span><span class="p">()</span><span class="o">.</span><span class="n">read</span><span class="p">(</span> |
| <span class="n">current_restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">())</span> |
| |
| <span class="k">return</span> <span class="n">SDFBoundedSourceDoFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_data_to_display</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pvalue</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">pvalue</span> <span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_create_sdf_bounded_source_dofn</span><span class="p">())</span> |
| |
| <span class="k">def</span> <span class="nf">get_windowing</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_inputs</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">core</span><span class="o">.</span><span class="n">Windowing</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span> |
| |
| <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_data_to_display</span> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| © Copyright |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |