blob: aecae56c82e7f0a36014510744c8893032e61a70 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.io.iobase &mdash; Apache Beam 2.47.0 documentation</title>
<script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../genindex.html" />
<link rel="search" title="Search" href="../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.47.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.io.iobase</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.io.iobase</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Sources and sinks.</span>
<span class="sd">A Source manages record-oriented data input from a particular kind of source</span>
<span class="sd">(e.g. a set of files, a database table, etc.). The reader() method of a source</span>
<span class="sd">returns a reader object supporting the iterator protocol; iteration yields</span>
<span class="sd">raw records of unprocessed, serialized data.</span>
<span class="sd">A Sink manages record-oriented data output to a particular kind of sink</span>
<span class="sd">(e.g. a set of files, a database table, etc.). The writer() method of a sink</span>
<span class="sd">returns a writer object supporting writing records of serialized data to</span>
<span class="sd">the sink.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="c1"># pytype: skip-file</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">math</span>
<span class="kn">import</span> <span class="nn">random</span>
<span class="kn">import</span> <span class="nn">uuid</span>
<span class="kn">from</span> <span class="nn">collections</span> <span class="kn">import</span> <span class="n">namedtuple</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Any</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Iterator</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Optional</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Tuple</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Union</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">coders</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">pvalue</span>
<span class="kn">from</span> <span class="nn">apache_beam.coders.coders</span> <span class="kn">import</span> <span class="n">_MemoizingPickleCoder</span>
<span class="kn">from</span> <span class="nn">apache_beam.internal</span> <span class="kn">import</span> <span class="n">pickler</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability</span> <span class="kn">import</span> <span class="n">common_urns</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability</span> <span class="kn">import</span> <span class="n">python_urns</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability.api</span> <span class="kn">import</span> <span class="n">beam_runner_api_pb2</span>
<span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="kn">import</span> <span class="n">AsIter</span>
<span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="kn">import</span> <span class="n">AsSingleton</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">Impulse</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">PTransform</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">core</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">ptransform</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">window</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="kn">import</span> <span class="n">DisplayDataItem</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="kn">import</span> <span class="n">HasDisplayData</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils</span> <span class="kn">import</span> <span class="n">timestamp</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils</span> <span class="kn">import</span> <span class="n">urns</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils.windowed_value</span> <span class="kn">import</span> <span class="n">WindowedValue</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span>
<span class="s1">&#39;BoundedSource&#39;</span><span class="p">,</span>
<span class="s1">&#39;RangeTracker&#39;</span><span class="p">,</span>
<span class="s1">&#39;Read&#39;</span><span class="p">,</span>
<span class="s1">&#39;RestrictionProgress&#39;</span><span class="p">,</span>
<span class="s1">&#39;RestrictionTracker&#39;</span><span class="p">,</span>
<span class="s1">&#39;WatermarkEstimator&#39;</span><span class="p">,</span>
<span class="s1">&#39;Sink&#39;</span><span class="p">,</span>
<span class="s1">&#39;Write&#39;</span><span class="p">,</span>
<span class="s1">&#39;Writer&#39;</span>
<span class="p">]</span>
<span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>
<span class="c1"># Encapsulates information about a bundle of a source generated when method</span>
<span class="c1"># BoundedSource.split() is invoked.</span>
<span class="c1"># This is a named 4-tuple that has following fields.</span>
<span class="c1"># * weight - a number that represents the size of the bundle. This value will</span>
<span class="c1"># be used to compare the relative sizes of bundles generated by the</span>
<span class="c1"># current source.</span>
<span class="c1"># The weight returned here could be specified using a unit of your</span>
<span class="c1"># choice (for example, bundles of sizes 100MB, 200MB, and 700MB may</span>
<span class="c1"># specify weights 100, 200, 700 or 1, 2, 7) but all bundles of a</span>
<span class="c1"># source should specify the weight using the same unit.</span>
<span class="c1"># * source - a BoundedSource object for the bundle.</span>
<span class="c1"># * start_position - starting position of the bundle</span>
<span class="c1"># * stop_position - ending position of the bundle.</span>
<span class="c1">#</span>
<span class="c1"># Type for start and stop positions are specific to the bounded source and must</span>
<span class="c1"># be consistent throughout.</span>
<span class="n">SourceBundle</span> <span class="o">=</span> <span class="n">namedtuple</span><span class="p">(</span>
<span class="s1">&#39;SourceBundle&#39;</span><span class="p">,</span> <span class="s1">&#39;weight source start_position stop_position&#39;</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">SourceBase</span><span class="p">(</span><span class="n">HasDisplayData</span><span class="p">,</span> <span class="n">urns</span><span class="o">.</span><span class="n">RunnerApiFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Base class for all sources that can be passed to beam.io.Read(...).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">urns</span><span class="o">.</span><span class="n">RunnerApiFn</span><span class="o">.</span><span class="n">register_pickle_urn</span><span class="p">(</span><span class="n">python_urns</span><span class="o">.</span><span class="n">PICKLED_SOURCE</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">is_bounded</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; bool</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span>
<div class="viewcode-block" id="BoundedSource"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource">[docs]</a><span class="k">class</span> <span class="nc">BoundedSource</span><span class="p">(</span><span class="n">SourceBase</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A source that reads a finite amount of input records.</span>
<span class="sd"> This class defines following operations which can be used to read the source</span>
<span class="sd"> efficiently.</span>
<span class="sd"> * Size estimation - method ``estimate_size()`` may return an accurate</span>
<span class="sd"> estimation in bytes for the size of the source.</span>
<span class="sd"> * Splitting into bundles of a given size - method ``split()`` can be used to</span>
<span class="sd"> split the source into a set of sub-sources (bundles) based on a desired</span>
<span class="sd"> bundle size.</span>
<span class="sd"> * Getting a RangeTracker - method ``get_range_tracker()`` should return a</span>
<span class="sd"> ``RangeTracker`` object for a given position range for the position type</span>
<span class="sd"> of the records returned by the source.</span>
<span class="sd"> * Reading the data - method ``read()`` can be used to read data from the</span>
<span class="sd"> source while respecting the boundaries defined by a given</span>
<span class="sd"> ``RangeTracker``.</span>
<span class="sd"> A runner will perform reading the source in two steps.</span>
<span class="sd"> (1) Method ``get_range_tracker()`` will be invoked with start and end</span>
<span class="sd"> positions to obtain a ``RangeTracker`` for the range of positions the</span>
<span class="sd"> runner intends to read. Source must define a default initial start and end</span>
<span class="sd"> position range. These positions must be used if the start and/or end</span>
<span class="sd"> positions passed to the method ``get_range_tracker()`` are ``None``</span>
<span class="sd"> (2) Method read() will be invoked with the ``RangeTracker`` obtained in the</span>
<span class="sd"> previous step.</span>
<span class="sd"> **Mutability**</span>
<span class="sd"> A ``BoundedSource`` object should not be mutated while</span>
<span class="sd"> its methods (for example, ``read()``) are being invoked by a runner. Runner</span>
<span class="sd"> implementations may invoke methods of ``BoundedSource`` objects through</span>
<span class="sd"> multi-threaded and/or reentrant execution modes.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="BoundedSource.estimate_size"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.estimate_size">[docs]</a> <span class="k">def</span> <span class="nf">estimate_size</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; Optional[int]</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Estimates the size of source in bytes.</span>
<span class="sd"> An estimate of the total size (in bytes) of the data that would be read</span>
<span class="sd"> from this source. This estimate is in terms of external storage size,</span>
<span class="sd"> before performing decompression or other processing.</span>
<span class="sd"> Returns:</span>
<span class="sd"> estimated size of the source if the size can be determined, ``None``</span>
<span class="sd"> otherwise.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="BoundedSource.split"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.split">[docs]</a> <span class="k">def</span> <span class="nf">split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span>
<span class="n">desired_bundle_size</span><span class="p">,</span> <span class="c1"># type: int</span>
<span class="n">start_position</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Any]</span>
<span class="n">stop_position</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Any]</span>
<span class="p">):</span>
<span class="c1"># type: (...) -&gt; Iterator[SourceBundle]</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Splits the source into a set of bundles.</span>
<span class="sd"> Bundles should be approximately of size ``desired_bundle_size`` bytes.</span>
<span class="sd"> Args:</span>
<span class="sd"> desired_bundle_size: the desired size (in bytes) of the bundles returned.</span>
<span class="sd"> start_position: if specified the given position must be used as the</span>
<span class="sd"> starting position of the first bundle.</span>
<span class="sd"> stop_position: if specified the given position must be used as the ending</span>
<span class="sd"> position of the last bundle.</span>
<span class="sd"> Returns:</span>
<span class="sd"> an iterator of objects of type &#39;SourceBundle&#39; that gives information about</span>
<span class="sd"> the generated bundles.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="BoundedSource.get_range_tracker"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.get_range_tracker">[docs]</a> <span class="k">def</span> <span class="nf">get_range_tracker</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span>
<span class="n">start_position</span><span class="p">,</span> <span class="c1"># type: Optional[Any]</span>
<span class="n">stop_position</span><span class="p">,</span> <span class="c1"># type: Optional[Any]</span>
<span class="p">):</span>
<span class="c1"># type: (...) -&gt; RangeTracker</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns a RangeTracker for a given position range.</span>
<span class="sd"> Framework may invoke ``read()`` method with the RangeTracker object returned</span>
<span class="sd"> here to read data from the source.</span>
<span class="sd"> Args:</span>
<span class="sd"> start_position: starting position of the range. If &#39;None&#39; default start</span>
<span class="sd"> position of the source must be used.</span>
<span class="sd"> stop_position: ending position of the range. If &#39;None&#39; default stop</span>
<span class="sd"> position of the source must be used.</span>
<span class="sd"> Returns:</span>
<span class="sd"> a ``RangeTracker`` for the given position range.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="BoundedSource.read"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.read">[docs]</a> <span class="k">def</span> <span class="nf">read</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">range_tracker</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns an iterator that reads data from the source.</span>
<span class="sd"> The returned set of data must respect the boundaries defined by the given</span>
<span class="sd"> ``RangeTracker`` object. For example:</span>
<span class="sd"> * Returned set of data must be for the range</span>
<span class="sd"> ``[range_tracker.start_position, range_tracker.stop_position)``. Note</span>
<span class="sd"> that a source may decide to return records that start after</span>
<span class="sd"> ``range_tracker.stop_position``. See documentation in class</span>
<span class="sd"> ``RangeTracker`` for more details. Also, note that framework might</span>
<span class="sd"> invoke ``range_tracker.try_split()`` to perform dynamic split</span>
<span class="sd"> operations. range_tracker.stop_position may be updated</span>
<span class="sd"> dynamically due to successful dynamic split operations.</span>
<span class="sd"> * Method ``range_tracker.try_split()`` must be invoked for every record</span>
<span class="sd"> that starts at a split point.</span>
<span class="sd"> * Method ``range_tracker.record_current_position()`` may be invoked for</span>
<span class="sd"> records that do not start at split points.</span>
<span class="sd"> Args:</span>
<span class="sd"> range_tracker: a ``RangeTracker`` whose boundaries must be respected</span>
<span class="sd"> when reading data from the source. A runner that reads this</span>
<span class="sd"> source muss pass a ``RangeTracker`` object that is not</span>
<span class="sd"> ``None``.</span>
<span class="sd"> Returns:</span>
<span class="sd"> an iterator of data read by the source.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="BoundedSource.default_output_coder"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.default_output_coder">[docs]</a> <span class="k">def</span> <span class="nf">default_output_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Coder that should be used for the records returned by the source.</span>
<span class="sd"> Should be overridden by sources that produce objects that can be encoded</span>
<span class="sd"> more efficiently than pickling.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="nb">object</span><span class="p">)</span></div>
<div class="viewcode-block" id="BoundedSource.is_bounded"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.is_bounded">[docs]</a> <span class="k">def</span> <span class="nf">is_bounded</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="kc">True</span></div></div>
<div class="viewcode-block" id="RangeTracker"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker">[docs]</a><span class="k">class</span> <span class="nc">RangeTracker</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A thread safe object used by Dataflow source framework.</span>
<span class="sd"> A Dataflow source is defined using a &#39;&#39;BoundedSource&#39;&#39; and a &#39;&#39;RangeTracker&#39;&#39;</span>
<span class="sd"> pair. A &#39;&#39;RangeTracker&#39;&#39; is used by Dataflow source framework to perform</span>
<span class="sd"> dynamic work rebalancing of position-based sources.</span>
<span class="sd"> **Position-based sources**</span>
<span class="sd"> A position-based source is one where the source can be described by a range</span>
<span class="sd"> of positions of an ordered type and the records returned by the reader can be</span>
<span class="sd"> described by positions of the same type.</span>
<span class="sd"> In case a record occupies a range of positions in the source, the most</span>
<span class="sd"> important thing about the record is the position where it starts.</span>
<span class="sd"> Defining the semantics of positions for a source is entirely up to the source</span>
<span class="sd"> class, however the chosen definitions have to obey certain properties in order</span>
<span class="sd"> to make it possible to correctly split the source into parts, including</span>
<span class="sd"> dynamic splitting. Two main aspects need to be defined:</span>
<span class="sd"> 1. How to assign starting positions to records.</span>
<span class="sd"> 2. Which records should be read by a source with a range &#39;[A, B)&#39;.</span>
<span class="sd"> Moreover, reading a range must be *efficient*, i.e., the performance of</span>
<span class="sd"> reading a range should not significantly depend on the location of the range.</span>
<span class="sd"> For example, reading the range [A, B) should not require reading all data</span>
<span class="sd"> before &#39;A&#39;.</span>
<span class="sd"> The sections below explain exactly what properties these definitions must</span>
<span class="sd"> satisfy, and how to use a ``RangeTracker`` with a properly defined source.</span>
<span class="sd"> **Properties of position-based sources**</span>
<span class="sd"> The main requirement for position-based sources is *associativity*: reading</span>
<span class="sd"> records from &#39;[A, B)&#39; and records from &#39;[B, C)&#39; should give the same</span>
<span class="sd"> records as reading from &#39;[A, C)&#39;, where &#39;A &lt;= B &lt;= C&#39;. This property</span>
<span class="sd"> ensures that no matter how a range of positions is split into arbitrarily many</span>
<span class="sd"> sub-ranges, the total set of records described by them stays the same.</span>
<span class="sd"> The other important property is how the source&#39;s range relates to positions of</span>
<span class="sd"> records in the source. In many sources each record can be identified by a</span>
<span class="sd"> unique starting position. In this case:</span>
<span class="sd"> * All records returned by a source &#39;[A, B)&#39; must have starting positions in</span>
<span class="sd"> this range.</span>
<span class="sd"> * All but the last record should end within this range. The last record may or</span>
<span class="sd"> may not extend past the end of the range.</span>
<span class="sd"> * Records should not overlap.</span>
<span class="sd"> Such sources should define &quot;read &#39;[A, B)&#39;&quot; as &quot;read from the first record</span>
<span class="sd"> starting at or after &#39;A&#39;, up to but not including the first record starting</span>
<span class="sd"> at or after &#39;B&#39;&quot;.</span>
<span class="sd"> Some examples of such sources include reading lines or CSV from a text file,</span>
<span class="sd"> reading keys and values from a BigTable, etc.</span>
<span class="sd"> The concept of *split points* allows to extend the definitions for dealing</span>
<span class="sd"> with sources where some records cannot be identified by a unique starting</span>
<span class="sd"> position.</span>
<span class="sd"> In all cases, all records returned by a source &#39;[A, B)&#39; must *start* at or</span>
<span class="sd"> after &#39;A&#39;.</span>
<span class="sd"> **Split points**</span>
<span class="sd"> Some sources may have records that are not directly addressable. For example,</span>
<span class="sd"> imagine a file format consisting of a sequence of compressed blocks. Each</span>
<span class="sd"> block can be assigned an offset, but records within the block cannot be</span>
<span class="sd"> directly addressed without decompressing the block. Let us refer to this</span>
<span class="sd"> hypothetical format as &lt;i&gt;CBF (Compressed Blocks Format)&lt;/i&gt;.</span>
<span class="sd"> Many such formats can still satisfy the associativity property. For example,</span>
<span class="sd"> in CBF, reading &#39;[A, B)&#39; can mean &quot;read all the records in all blocks whose</span>
<span class="sd"> starting offset is in &#39;[A, B)&#39;&quot;.</span>
<span class="sd"> To support such complex formats, we introduce the notion of *split points*. We</span>
<span class="sd"> say that a record is a split point if there exists a position &#39;A&#39; such that</span>
<span class="sd"> the record is the first one to be returned when reading the range</span>
<span class="sd"> &#39;[A, infinity)&#39;. In CBF, the only split points would be the first records</span>
<span class="sd"> in each block.</span>
<span class="sd"> Split points allow us to define the meaning of a record&#39;s position and a</span>
<span class="sd"> source&#39;s range in all cases:</span>
<span class="sd"> * For a record that is at a split point, its position is defined to be the</span>
<span class="sd"> largest &#39;A&#39; such that reading a source with the range &#39;[A, infinity)&#39;</span>
<span class="sd"> returns this record.</span>
<span class="sd"> * Positions of other records are only required to be non-decreasing.</span>
<span class="sd"> * Reading the source &#39;[A, B)&#39; must return records starting from the first</span>
<span class="sd"> split point at or after &#39;A&#39;, up to but not including the first split point</span>
<span class="sd"> at or after &#39;B&#39;. In particular, this means that the first record returned</span>
<span class="sd"> by a source MUST always be a split point.</span>
<span class="sd"> * Positions of split points must be unique.</span>
<span class="sd"> As a result, for any decomposition of the full range of the source into</span>
<span class="sd"> position ranges, the total set of records will be the full set of records in</span>
<span class="sd"> the source, and each record will be read exactly once.</span>
<span class="sd"> **Consumed positions**</span>
<span class="sd"> As the source is being read, and records read from it are being passed to the</span>
<span class="sd"> downstream transforms in the pipeline, we say that positions in the source are</span>
<span class="sd"> being *consumed*. When a reader has read a record (or promised to a caller</span>
<span class="sd"> that a record will be returned), positions up to and including the record&#39;s</span>
<span class="sd"> start position are considered *consumed*.</span>
<span class="sd"> Dynamic splitting can happen only at *unconsumed* positions. If the reader</span>
<span class="sd"> just returned a record at offset 42 in a file, dynamic splitting can happen</span>
<span class="sd"> only at offset 43 or beyond, as otherwise that record could be read twice (by</span>
<span class="sd"> the current reader and by a reader of the task starting at 43).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">SPLIT_POINTS_UNKNOWN</span> <span class="o">=</span> <span class="nb">object</span><span class="p">()</span>
<div class="viewcode-block" id="RangeTracker.start_position"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.start_position">[docs]</a> <span class="k">def</span> <span class="nf">start_position</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns the starting position of the current range, inclusive.&quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div>
<div class="viewcode-block" id="RangeTracker.stop_position"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.stop_position">[docs]</a> <span class="k">def</span> <span class="nf">stop_position</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns the ending position of the current range, exclusive.&quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div>
<div class="viewcode-block" id="RangeTracker.try_claim"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.try_claim">[docs]</a> <span class="k">def</span> <span class="nf">try_claim</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">position</span><span class="p">):</span> <span class="c1"># pylint: disable=unused-argument</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Atomically determines if a record at a split point is within the range.</span>
<span class="sd"> This method should be called **if and only if** the record is at a split</span>
<span class="sd"> point. This method may modify the internal state of the ``RangeTracker`` by</span>
<span class="sd"> updating the last-consumed position to ``position``.</span>
<span class="sd"> ** Thread safety **</span>
<span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span>
<span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span>
<span class="sd"> lock object.</span>
<span class="sd"> Args:</span>
<span class="sd"> position: starting position of a record being read by a source.</span>
<span class="sd"> Returns:</span>
<span class="sd"> ``True``, if the given position falls within the current range, returns</span>
<span class="sd"> ``False`` otherwise.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RangeTracker.set_current_position"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.set_current_position">[docs]</a> <span class="k">def</span> <span class="nf">set_current_position</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">position</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Updates the last-consumed position to the given position.</span>
<span class="sd"> A source may invoke this method for records that do not start at split</span>
<span class="sd"> points. This may modify the internal state of the ``RangeTracker``. If the</span>
<span class="sd"> record starts at a split point, method ``try_claim()`` **must** be invoked</span>
<span class="sd"> instead of this method.</span>
<span class="sd"> Args:</span>
<span class="sd"> position: starting position of a record being read by a source.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RangeTracker.position_at_fraction"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.position_at_fraction">[docs]</a> <span class="k">def</span> <span class="nf">position_at_fraction</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fraction</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns the position at the given fraction.</span>
<span class="sd"> Given a fraction within the range [0.0, 1.0) this method will return the</span>
<span class="sd"> position at the given fraction compared to the position range</span>
<span class="sd"> [self.start_position, self.stop_position).</span>
<span class="sd"> ** Thread safety **</span>
<span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span>
<span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span>
<span class="sd"> lock object.</span>
<span class="sd"> Args:</span>
<span class="sd"> fraction: a float value within the range [0.0, 1.0).</span>
<span class="sd"> Returns:</span>
<span class="sd"> a position within the range [self.start_position, self.stop_position).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RangeTracker.try_split"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.try_split">[docs]</a> <span class="k">def</span> <span class="nf">try_split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">position</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Atomically splits the current range.</span>
<span class="sd"> Determines a position to split the current range, split_position, based on</span>
<span class="sd"> the given position. In most cases split_position and position will be the</span>
<span class="sd"> same.</span>
<span class="sd"> Splits the current range &#39;[self.start_position, self.stop_position)&#39;</span>
<span class="sd"> into a &quot;primary&quot; part &#39;[self.start_position, split_position)&#39; and a</span>
<span class="sd"> &quot;residual&quot; part &#39;[split_position, self.stop_position)&#39;, assuming the</span>
<span class="sd"> current last-consumed position is within</span>
<span class="sd"> &#39;[self.start_position, split_position)&#39; (i.e., split_position has not been</span>
<span class="sd"> consumed yet).</span>
<span class="sd"> If successful, updates the current range to be the primary and returns a</span>
<span class="sd"> tuple (split_position, split_fraction). split_fraction should be the</span>
<span class="sd"> fraction of size of range &#39;[self.start_position, split_position)&#39; compared</span>
<span class="sd"> to the original (before split) range</span>
<span class="sd"> &#39;[self.start_position, self.stop_position)&#39;.</span>
<span class="sd"> If the split_position has already been consumed, returns ``None``.</span>
<span class="sd"> ** Thread safety **</span>
<span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span>
<span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span>
<span class="sd"> lock object.</span>
<span class="sd"> Args:</span>
<span class="sd"> position: suggested position where the current range should try to</span>
<span class="sd"> be split at.</span>
<span class="sd"> Returns:</span>
<span class="sd"> a tuple containing the split position and split fraction if split is</span>
<span class="sd"> successful. Returns ``None`` otherwise.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RangeTracker.fraction_consumed"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.fraction_consumed">[docs]</a> <span class="k">def</span> <span class="nf">fraction_consumed</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns the approximate fraction of consumed positions in the source.</span>
<span class="sd"> ** Thread safety **</span>
<span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span>
<span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span>
<span class="sd"> lock object.</span>
<span class="sd"> Returns:</span>
<span class="sd"> the approximate fraction of positions that have been consumed by</span>
<span class="sd"> successful &#39;try_split()&#39; and &#39;try_claim()&#39; calls, or</span>
<span class="sd"> 0.0 if no such calls have happened.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RangeTracker.split_points"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.split_points">[docs]</a> <span class="k">def</span> <span class="nf">split_points</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Gives the number of split points consumed and remaining.</span>
<span class="sd"> For a ``RangeTracker`` used by a ``BoundedSource`` (within a</span>
<span class="sd"> ``BoundedSource.read()`` invocation) this method produces a 2-tuple that</span>
<span class="sd"> gives the number of split points consumed by the ``BoundedSource`` and the</span>
<span class="sd"> number of split points remaining within the range of the ``RangeTracker``</span>
<span class="sd"> that has not been consumed by the ``BoundedSource``.</span>
<span class="sd"> More specifically, given that the position of the current record being read</span>
<span class="sd"> by ``BoundedSource`` is current_position this method produces a tuple that</span>
<span class="sd"> consists of</span>
<span class="sd"> (1) number of split points in the range [self.start_position(),</span>
<span class="sd"> current_position) without including the split point that is currently being</span>
<span class="sd"> consumed. This represents the total amount of parallelism in the consumed</span>
<span class="sd"> part of the source.</span>
<span class="sd"> (2) number of split points within the range</span>
<span class="sd"> [current_position, self.stop_position()) including the split point that is</span>
<span class="sd"> currently being consumed. This represents the total amount of parallelism in</span>
<span class="sd"> the unconsumed part of the source.</span>
<span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span>
<span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span>
<span class="sd"> lock object.</span>
<span class="sd"> ** General information about consumed and remaining number of split</span>
<span class="sd"> points returned by this method. **</span>
<span class="sd"> * Before a source read (``BoundedSource.read()`` invocation) claims the</span>
<span class="sd"> first split point, number of consumed split points is 0. This condition</span>
<span class="sd"> holds independent of whether the input is &quot;splittable&quot;. A splittable</span>
<span class="sd"> source is a source that has more than one split point.</span>
<span class="sd"> * Any source read that has only claimed one split point has 0 consumed</span>
<span class="sd"> split points since the first split point is the current split point and</span>
<span class="sd"> is still being processed. This condition holds independent of whether</span>
<span class="sd"> the input is splittable.</span>
<span class="sd"> * For an empty source read which never invokes</span>
<span class="sd"> ``RangeTracker.try_claim()``, the consumed number of split points is 0.</span>
<span class="sd"> This condition holds independent of whether the input is splittable.</span>
<span class="sd"> * For a source read which has invoked ``RangeTracker.try_claim()`` n</span>
<span class="sd"> times, the consumed number of split points is n -1.</span>
<span class="sd"> * If a ``BoundedSource`` sets a callback through function</span>
<span class="sd"> ``set_split_points_unclaimed_callback()``, ``RangeTracker`` can use that</span>
<span class="sd"> callback when determining remaining number of split points.</span>
<span class="sd"> * Remaining split points should include the split point that is currently</span>
<span class="sd"> being consumed by the source read. Hence if the above callback returns</span>
<span class="sd"> an integer value n, remaining number of split points should be (n + 1).</span>
<span class="sd"> * After last split point is claimed remaining split points becomes 1,</span>
<span class="sd"> because this unfinished read itself represents an unfinished split</span>
<span class="sd"> point.</span>
<span class="sd"> * After all records of the source has been consumed, remaining number of</span>
<span class="sd"> split points becomes 0 and consumed number of split points becomes equal</span>
<span class="sd"> to the total number of split points within the range being read by the</span>
<span class="sd"> source. This method does not address this condition and will continue to</span>
<span class="sd"> report number of consumed split points as</span>
<span class="sd"> (&quot;total number of split points&quot; - 1) and number of remaining split</span>
<span class="sd"> points as 1. A runner that performs the reading of the source can</span>
<span class="sd"> detect when all records have been consumed and adjust remaining and</span>
<span class="sd"> consumed number of split points accordingly.</span>
<span class="sd"> ** Examples **</span>
<span class="sd"> (1) A &quot;perfectly splittable&quot; input which can be read in parallel down to the</span>
<span class="sd"> individual records.</span>
<span class="sd"> Consider a perfectly splittable input that consists of 50 split points.</span>
<span class="sd"> * Before a source read (``BoundedSource.read()`` invocation) claims the</span>
<span class="sd"> first split point, number of consumed split points is 0 number of</span>
<span class="sd"> remaining split points is 50.</span>
<span class="sd"> * After claiming first split point, consumed number of split points is 0</span>
<span class="sd"> and remaining number of split is 50.</span>
<span class="sd"> * After claiming split point #30, consumed number of split points is 29</span>
<span class="sd"> and remaining number of split points is 21.</span>
<span class="sd"> * After claiming all 50 split points, consumed number of split points is</span>
<span class="sd"> 49 and remaining number of split points is 1.</span>
<span class="sd"> (2) a &quot;block-compressed&quot; file format such as ``avroio``, in which a block of</span>
<span class="sd"> records has to be read as a whole, but different blocks can be read in</span>
<span class="sd"> parallel.</span>
<span class="sd"> Consider a block compressed input that consists of 5 blocks.</span>
<span class="sd"> * Before a source read (``BoundedSource.read()`` invocation) claims the</span>
<span class="sd"> first split point (first block), number of consumed split points is 0</span>
<span class="sd"> number of remaining split points is 5.</span>
<span class="sd"> * After claiming first split point, consumed number of split points is 0</span>
<span class="sd"> and remaining number of split is 5.</span>
<span class="sd"> * After claiming split point #3, consumed number of split points is 2</span>
<span class="sd"> and remaining number of split points is 3.</span>
<span class="sd"> * After claiming all 5 split points, consumed number of split points is</span>
<span class="sd"> 4 and remaining number of split points is 1.</span>
<span class="sd"> (3) an &quot;unsplittable&quot; input such as a cursor in a database or a gzip</span>
<span class="sd"> compressed file.</span>
<span class="sd"> Such an input is considered to have only a single split point. Number of</span>
<span class="sd"> consumed split points is always 0 and number of remaining split points</span>
<span class="sd"> is always 1.</span>
<span class="sd"> By default ``RangeTracker` returns ``RangeTracker.SPLIT_POINTS_UNKNOWN`` for</span>
<span class="sd"> both consumed and remaining number of split points, which indicates that the</span>
<span class="sd"> number of split points consumed and remaining is unknown.</span>
<span class="sd"> Returns:</span>
<span class="sd"> A pair that gives consumed and remaining number of split points. Consumed</span>
<span class="sd"> number of split points should be an integer larger than or equal to zero</span>
<span class="sd"> or ``RangeTracker.SPLIT_POINTS_UNKNOWN``. Remaining number of split points</span>
<span class="sd"> should be an integer larger than zero or</span>
<span class="sd"> ``RangeTracker.SPLIT_POINTS_UNKNOWN``.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">RangeTracker</span><span class="o">.</span><span class="n">SPLIT_POINTS_UNKNOWN</span><span class="p">,</span> <span class="n">RangeTracker</span><span class="o">.</span><span class="n">SPLIT_POINTS_UNKNOWN</span><span class="p">)</span></div>
<div class="viewcode-block" id="RangeTracker.set_split_points_unclaimed_callback"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.set_split_points_unclaimed_callback">[docs]</a> <span class="k">def</span> <span class="nf">set_split_points_unclaimed_callback</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">callback</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Sets a callback for determining the unclaimed number of split points.</span>
<span class="sd"> By invoking this function, a ``BoundedSource`` can set a callback function</span>
<span class="sd"> that may get invoked by the ``RangeTracker`` to determine the number of</span>
<span class="sd"> unclaimed split points. A split point is unclaimed if</span>
<span class="sd"> ``RangeTracker.try_claim()`` method has not been successfully invoked for</span>
<span class="sd"> that particular split point. The callback function accepts a single</span>
<span class="sd"> parameter, a stop position for the BoundedSource (stop_position). If the</span>
<span class="sd"> record currently being consumed by the ``BoundedSource`` is at position</span>
<span class="sd"> current_position, callback should return the number of split points within</span>
<span class="sd"> the range (current_position, stop_position). Note that, this should not</span>
<span class="sd"> include the split point that is currently being consumed by the source.</span>
<span class="sd"> This function must be implemented by subclasses before being used.</span>
<span class="sd"> Args:</span>
<span class="sd"> callback: a function that takes a single parameter, a stop position,</span>
<span class="sd"> and returns unclaimed number of split points for the source read</span>
<span class="sd"> operation that is calling this function. Value returned from</span>
<span class="sd"> callback should be either an integer larger than or equal to</span>
<span class="sd"> zero or ``RangeTracker.SPLIT_POINTS_UNKNOWN``.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div>
<div class="viewcode-block" id="Sink"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink">[docs]</a><span class="k">class</span> <span class="nc">Sink</span><span class="p">(</span><span class="n">HasDisplayData</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;This class is deprecated, no backwards-compatibility guarantees.</span>
<span class="sd"> A resource that can be written to using the ``beam.io.Write`` transform.</span>
<span class="sd"> Here ``beam`` stands for Apache Beam Python code imported in following manner.</span>
<span class="sd"> ``import apache_beam as beam``.</span>
<span class="sd"> A parallel write to an ``iobase.Sink`` consists of three phases:</span>
<span class="sd"> 1. A sequential *initialization* phase (e.g., creating a temporary output</span>
<span class="sd"> directory, etc.)</span>
<span class="sd"> 2. A parallel write phase where workers write *bundles* of records</span>
<span class="sd"> 3. A sequential *finalization* phase (e.g., committing the writes, merging</span>
<span class="sd"> output files, etc.)</span>
<span class="sd"> Implementing a new sink requires extending two classes.</span>
<span class="sd"> 1. iobase.Sink</span>
<span class="sd"> ``iobase.Sink`` is an immutable logical description of the location/resource</span>
<span class="sd"> to write to. Depending on the type of sink, it may contain fields such as the</span>
<span class="sd"> path to an output directory on a filesystem, a database table name,</span>
<span class="sd"> etc. ``iobase.Sink`` provides methods for performing a write operation to the</span>
<span class="sd"> sink described by it. To this end, implementors of an extension of</span>
<span class="sd"> ``iobase.Sink`` must implement three methods:</span>
<span class="sd"> ``initialize_write()``, ``open_writer()``, and ``finalize_write()``.</span>
<span class="sd"> 2. iobase.Writer</span>
<span class="sd"> ``iobase.Writer`` is used to write a single bundle of records. An</span>
<span class="sd"> ``iobase.Writer`` defines two methods: ``write()`` which writes a</span>
<span class="sd"> single record from the bundle and ``close()`` which is called once</span>
<span class="sd"> at the end of writing a bundle.</span>
<span class="sd"> See also ``apache_beam.io.filebasedsink.FileBasedSink`` which provides a</span>
<span class="sd"> simpler API for writing sinks that produce files.</span>
<span class="sd"> **Execution of the Write transform**</span>
<span class="sd"> ``initialize_write()``, ``pre_finalize()``, and ``finalize_write()`` are</span>
<span class="sd"> conceptually called once. However, implementors must</span>
<span class="sd"> ensure that these methods are *idempotent*, as they may be called multiple</span>
<span class="sd"> times on different machines in the case of failure/retry. A method may be</span>
<span class="sd"> called more than once concurrently, in which case it&#39;s okay to have a</span>
<span class="sd"> transient failure (such as due to a race condition). This failure should not</span>
<span class="sd"> prevent subsequent retries from succeeding.</span>
<span class="sd"> ``initialize_write()`` should perform any initialization that needs to be done</span>
<span class="sd"> prior to writing to the sink. ``initialize_write()`` may return a result</span>
<span class="sd"> (let&#39;s call this ``init_result``) that contains any parameters it wants to</span>
<span class="sd"> pass on to its writers about the sink. For example, a sink that writes to a</span>
<span class="sd"> file system may return an ``init_result`` that contains a dynamically</span>
<span class="sd"> generated unique directory to which data should be written.</span>
<span class="sd"> To perform writing of a bundle of elements, Dataflow execution engine will</span>
<span class="sd"> create an ``iobase.Writer`` using the implementation of</span>
<span class="sd"> ``iobase.Sink.open_writer()``. When invoking ``open_writer()`` execution</span>
<span class="sd"> engine will provide the ``init_result`` returned by ``initialize_write()``</span>
<span class="sd"> invocation as well as a *bundle id* (let&#39;s call this ``bundle_id``) that is</span>
<span class="sd"> unique for each invocation of ``open_writer()``.</span>
<span class="sd"> Execution engine will then invoke ``iobase.Writer.write()`` implementation for</span>
<span class="sd"> each element that has to be written. Once all elements of a bundle are</span>
<span class="sd"> written, execution engine will invoke ``iobase.Writer.close()`` implementation</span>
<span class="sd"> which should return a result (let&#39;s call this ``write_result``) that contains</span>
<span class="sd"> information that encodes the result of the write and, in most cases, some</span>
<span class="sd"> encoding of the unique bundle id. For example, if each bundle is written to a</span>
<span class="sd"> unique temporary file, ``close()`` method may return an object that contains</span>
<span class="sd"> the temporary file name. After writing of all bundles is complete, execution</span>
<span class="sd"> engine will invoke ``pre_finalize()`` and then ``finalize_write()``</span>
<span class="sd"> implementation.</span>
<span class="sd"> The execution of a write transform can be illustrated using following pseudo</span>
<span class="sd"> code (assume that the outer for loop happens in parallel across many</span>
<span class="sd"> machines)::</span>
<span class="sd"> init_result = sink.initialize_write()</span>
<span class="sd"> write_results = []</span>
<span class="sd"> for bundle in partition(pcoll):</span>
<span class="sd"> writer = sink.open_writer(init_result, generate_bundle_id())</span>
<span class="sd"> for elem in bundle:</span>
<span class="sd"> writer.write(elem)</span>
<span class="sd"> write_results.append(writer.close())</span>
<span class="sd"> pre_finalize_result = sink.pre_finalize(init_result, write_results)</span>
<span class="sd"> sink.finalize_write(init_result, write_results, pre_finalize_result)</span>
<span class="sd"> **init_result**</span>
<span class="sd"> Methods of &#39;iobase.Sink&#39; should agree on the &#39;init_result&#39; type that will be</span>
<span class="sd"> returned when initializing the sink. This type can be a client-defined object</span>
<span class="sd"> or an existing type. The returned type must be picklable using Dataflow coder</span>
<span class="sd"> ``coders.PickleCoder``. Returning an init_result is optional.</span>
<span class="sd"> **bundle_id**</span>
<span class="sd"> In order to ensure fault-tolerance, a bundle may be executed multiple times</span>
<span class="sd"> (e.g., in the event of failure/retry or for redundancy). However, exactly one</span>
<span class="sd"> of these executions will have its result passed to the</span>
<span class="sd"> ``iobase.Sink.finalize_write()`` method. Each call to</span>
<span class="sd"> ``iobase.Sink.open_writer()`` is passed a unique bundle id when it is called</span>
<span class="sd"> by the ``WriteImpl`` transform, so even redundant or retried bundles will have</span>
<span class="sd"> a unique way of identifying their output.</span>
<span class="sd"> The bundle id should be used to guarantee that a bundle&#39;s output is unique.</span>
<span class="sd"> This uniqueness guarantee is important; if a bundle is to be output to a file,</span>
<span class="sd"> for example, the name of the file must be unique to avoid conflicts with other</span>
<span class="sd"> writers. The bundle id should be encoded in the writer result returned by the</span>
<span class="sd"> writer and subsequently used by the ``finalize_write()`` method to identify</span>
<span class="sd"> the results of successful writes.</span>
<span class="sd"> For example, consider the scenario where a Writer writes files containing</span>
<span class="sd"> serialized records and the ``finalize_write()`` is to merge or rename these</span>
<span class="sd"> output files. In this case, a writer may use its unique id to name its output</span>
<span class="sd"> file (to avoid conflicts) and return the name of the file it wrote as its</span>
<span class="sd"> writer result. The ``finalize_write()`` will then receive an ``Iterable`` of</span>
<span class="sd"> output file names that it can then merge or rename using some bundle naming</span>
<span class="sd"> scheme.</span>
<span class="sd"> **write_result**</span>
<span class="sd"> ``iobase.Writer.close()`` and ``finalize_write()`` implementations must agree</span>
<span class="sd"> on type of the ``write_result`` object returned when invoking</span>
<span class="sd"> ``iobase.Writer.close()``. This type can be a client-defined object or</span>
<span class="sd"> an existing type. The returned type must be picklable using Dataflow coder</span>
<span class="sd"> ``coders.PickleCoder``. Returning a ``write_result`` when</span>
<span class="sd"> ``iobase.Writer.close()`` is invoked is optional but if unique</span>
<span class="sd"> ``write_result`` objects are not returned, sink should, guarantee idempotency</span>
<span class="sd"> when same bundle is written multiple times due to failure/retry or redundancy.</span>
<span class="sd"> **More information**</span>
<span class="sd"> For more information on creating new sinks please refer to the official</span>
<span class="sd"> documentation at</span>
<span class="sd"> ``https://beam.apache.org/documentation/sdks/python-custom-io#creating-sinks``</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Whether Beam should skip writing any shards if all are empty.</span>
<span class="n">skip_if_empty</span> <span class="o">=</span> <span class="kc">False</span>
<div class="viewcode-block" id="Sink.initialize_write"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink.initialize_write">[docs]</a> <span class="k">def</span> <span class="nf">initialize_write</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Initializes the sink before writing begins.</span>
<span class="sd"> Invoked before any data is written to the sink.</span>
<span class="sd"> Please see documentation in ``iobase.Sink`` for an example.</span>
<span class="sd"> Returns:</span>
<span class="sd"> An object that contains any sink specific state generated by</span>
<span class="sd"> initialization. This object will be passed to open_writer() and</span>
<span class="sd"> finalize_write() methods.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="Sink.open_writer"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink.open_writer">[docs]</a> <span class="k">def</span> <span class="nf">open_writer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">init_result</span><span class="p">,</span> <span class="n">uid</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Opens a writer for writing a bundle of elements to the sink.</span>
<span class="sd"> Args:</span>
<span class="sd"> init_result: the result of initialize_write() invocation.</span>
<span class="sd"> uid: a unique identifier generated by the system.</span>
<span class="sd"> Returns:</span>
<span class="sd"> an ``iobase.Writer`` that can be used to write a bundle of records to the</span>
<span class="sd"> current sink.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="Sink.pre_finalize"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink.pre_finalize">[docs]</a> <span class="k">def</span> <span class="nf">pre_finalize</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">init_result</span><span class="p">,</span> <span class="n">writer_results</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Pre-finalization stage for sink.</span>
<span class="sd"> Called after all bundle writes are complete and before finalize_write.</span>
<span class="sd"> Used to setup and verify filesystem and sink states.</span>
<span class="sd"> Args:</span>
<span class="sd"> init_result: the result of ``initialize_write()`` invocation.</span>
<span class="sd"> writer_results: an iterable containing results of ``Writer.close()``</span>
<span class="sd"> invocations. This will only contain results of successful writes, and</span>
<span class="sd"> will only contain the result of a single successful write for a given</span>
<span class="sd"> bundle.</span>
<span class="sd"> Returns:</span>
<span class="sd"> An object that contains any sink specific state generated.</span>
<span class="sd"> This object will be passed to finalize_write().</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="Sink.finalize_write"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink.finalize_write">[docs]</a> <span class="k">def</span> <span class="nf">finalize_write</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">init_result</span><span class="p">,</span> <span class="n">writer_results</span><span class="p">,</span> <span class="n">pre_finalize_result</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Finalizes the sink after all data is written to it.</span>
<span class="sd"> Given the result of initialization and an iterable of results from bundle</span>
<span class="sd"> writes, performs finalization after writing and closes the sink. Called</span>
<span class="sd"> after all bundle writes are complete.</span>
<span class="sd"> The bundle write results that are passed to finalize are those returned by</span>
<span class="sd"> bundles that completed successfully. Although bundles may have been run</span>
<span class="sd"> multiple times (for fault-tolerance), only one writer result will be passed</span>
<span class="sd"> to finalize for each bundle. An implementation of finalize should perform</span>
<span class="sd"> clean up of any failed and successfully retried bundles. Note that these</span>
<span class="sd"> failed bundles will not have their writer result passed to finalize, so</span>
<span class="sd"> finalize should be capable of locating any temporary/partial output written</span>
<span class="sd"> by failed bundles.</span>
<span class="sd"> If all retries of a bundle fails, the whole pipeline will fail *without*</span>
<span class="sd"> finalize_write() being invoked.</span>
<span class="sd"> A best practice is to make finalize atomic. If this is impossible given the</span>
<span class="sd"> semantics of the sink, finalize should be idempotent, as it may be called</span>
<span class="sd"> multiple times in the case of failure/retry or for redundancy.</span>
<span class="sd"> Note that the iteration order of the writer results is not guaranteed to be</span>
<span class="sd"> consistent if finalize is called multiple times.</span>
<span class="sd"> Args:</span>
<span class="sd"> init_result: the result of ``initialize_write()`` invocation.</span>
<span class="sd"> writer_results: an iterable containing results of ``Writer.close()``</span>
<span class="sd"> invocations. This will only contain results of successful writes, and</span>
<span class="sd"> will only contain the result of a single successful write for a given</span>
<span class="sd"> bundle.</span>
<span class="sd"> pre_finalize_result: the result of ``pre_finalize()`` invocation.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div>
<div class="viewcode-block" id="Writer"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Writer">[docs]</a><span class="k">class</span> <span class="nc">Writer</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;This class is deprecated, no backwards-compatibility guarantees.</span>
<span class="sd"> Writes a bundle of elements from a ``PCollection`` to a sink.</span>
<span class="sd"> A Writer ``iobase.Writer.write()`` writes and elements to the sink while</span>
<span class="sd"> ``iobase.Writer.close()`` is called after all elements in the bundle have been</span>
<span class="sd"> written.</span>
<span class="sd"> See ``iobase.Sink`` for more detailed documentation about the process of</span>
<span class="sd"> writing to a sink.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="Writer.write"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Writer.write">[docs]</a> <span class="k">def</span> <span class="nf">write</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Writes a value to the sink using the current writer.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="Writer.close"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Writer.close">[docs]</a> <span class="k">def</span> <span class="nf">close</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Closes the current writer.</span>
<span class="sd"> Please see documentation in ``iobase.Sink`` for an example.</span>
<span class="sd"> Returns:</span>
<span class="sd"> An object representing the writes that were performed by the current</span>
<span class="sd"> writer.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="Writer.at_capacity"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Writer.at_capacity">[docs]</a> <span class="k">def</span> <span class="nf">at_capacity</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">bool</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns whether this writer should be considered at capacity</span>
<span class="sd"> and a new one should be created.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="kc">False</span></div></div>
<div class="viewcode-block" id="Read"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read">[docs]</a><span class="k">class</span> <span class="nc">Read</span><span class="p">(</span><span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A transform that reads a PCollection.&quot;&quot;&quot;</span>
<span class="c1"># Import runners here to prevent circular imports</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.pipeline_context</span> <span class="kn">import</span> <span class="n">PipelineContext</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source</span><span class="p">):</span>
<span class="c1"># type: (SourceBase) -&gt; None</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Initializes a Read transform.</span>
<span class="sd"> Args:</span>
<span class="sd"> source: Data source to read from.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">source</span> <span class="o">=</span> <span class="n">source</span>
<div class="viewcode-block" id="Read.get_desired_chunk_size"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.get_desired_chunk_size">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">get_desired_chunk_size</span><span class="p">(</span><span class="n">total_size</span><span class="p">):</span>
<span class="k">if</span> <span class="n">total_size</span><span class="p">:</span>
<span class="c1"># 1MB = 1 shard, 1GB = 32 shards, 1TB = 1000 shards, 1PB = 32k shards</span>
<span class="n">chunk_size</span> <span class="o">=</span> <span class="nb">max</span><span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">20</span><span class="p">,</span> <span class="mi">1000</span> <span class="o">*</span> <span class="nb">int</span><span class="p">(</span><span class="n">math</span><span class="o">.</span><span class="n">sqrt</span><span class="p">(</span><span class="n">total_size</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">chunk_size</span> <span class="o">=</span> <span class="mi">64</span> <span class="o">&lt;&lt;</span> <span class="mi">20</span> <span class="c1"># 64mb</span>
<span class="k">return</span> <span class="n">chunk_size</span></div>
<div class="viewcode-block" id="Read.expand"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pbegin</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">BoundedSource</span><span class="p">):</span>
<span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">register_coder</span><span class="p">(</span><span class="n">BoundedSource</span><span class="p">,</span> <span class="n">_MemoizingPickleCoder</span><span class="p">)</span>
<span class="n">display_data</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">display_data</span><span class="p">()</span> <span class="ow">or</span> <span class="p">{}</span>
<span class="n">display_data</span><span class="p">[</span><span class="s1">&#39;source&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="vm">__class__</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">pbegin</span>
<span class="o">|</span> <span class="n">Impulse</span><span class="p">()</span>
<span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">)</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">BoundedSource</span><span class="p">)</span>
<span class="o">|</span> <span class="n">SDFBoundedSourceReader</span><span class="p">(</span><span class="n">display_data</span><span class="p">))</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="c1"># The Read transform can also admit a full PTransform as an input</span>
<span class="c1"># rather than an anctual source. If the input is a PTransform, then</span>
<span class="c1"># just apply it directly.</span>
<span class="k">return</span> <span class="n">pbegin</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># Treat Read itself as a primitive.</span>
<span class="k">return</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">(</span>
<span class="n">pbegin</span><span class="o">.</span><span class="n">pipeline</span><span class="p">,</span> <span class="n">is_bounded</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">is_bounded</span><span class="p">())</span></div>
<div class="viewcode-block" id="Read.get_windowing"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.get_windowing">[docs]</a> <span class="k">def</span> <span class="nf">get_windowing</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_inputs</span><span class="p">):</span>
<span class="c1"># type: (...) -&gt; core.Windowing</span>
<span class="k">return</span> <span class="n">core</span><span class="o">.</span><span class="n">Windowing</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span></div>
<span class="k">def</span> <span class="nf">_infer_output_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">input_coder</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="c1"># type: (...) -&gt; Optional[coders.Coder]</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.dataflow.native_io</span> <span class="kn">import</span> <span class="n">iobase</span> <span class="k">as</span> <span class="n">dataflow_io</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">BoundedSource</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">default_output_coder</span><span class="p">()</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">dataflow_io</span><span class="o">.</span><span class="n">NativeSource</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">coder</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">None</span>
<div class="viewcode-block" id="Read.display_data"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span>
<span class="s1">&#39;source&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s1">&#39;Read Source&#39;</span><span class="p">),</span>
<span class="s1">&#39;source_dd&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span>
<span class="p">}</span></div>
<div class="viewcode-block" id="Read.to_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.to_runner_api_parameter">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">context</span><span class="p">:</span> <span class="n">PipelineContext</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.dataflow.native_io</span> <span class="kn">import</span> <span class="n">iobase</span> <span class="k">as</span> <span class="n">dataflow_io</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="p">(</span><span class="n">BoundedSource</span><span class="p">,</span> <span class="n">dataflow_io</span><span class="o">.</span><span class="n">NativeSource</span><span class="p">)):</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.pubsub</span> <span class="kn">import</span> <span class="n">_PubSubSource</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">_PubSubSource</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">PUBSUB_READ</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubReadPayload</span><span class="p">(</span>
<span class="n">topic</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">full_topic</span><span class="p">,</span>
<span class="n">subscription</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">full_subscription</span><span class="p">,</span>
<span class="n">timestamp_attribute</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">timestamp_attribute</span><span class="p">,</span>
<span class="n">with_attributes</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">with_attributes</span><span class="p">,</span>
<span class="n">id_attribute</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">id_label</span><span class="p">))</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">deprecated_primitives</span><span class="o">.</span><span class="n">READ</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ReadPayload</span><span class="p">(</span>
<span class="n">source</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">(</span><span class="n">context</span><span class="p">),</span>
<span class="n">is_bounded</span><span class="o">=</span><span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">IsBounded</span><span class="o">.</span><span class="n">BOUNDED</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">is_bounded</span><span class="p">()</span> <span class="k">else</span>
<span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">IsBounded</span><span class="o">.</span><span class="n">UNBOUNDED</span><span class="p">))</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">to_runner_api_parameter</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span>
<span class="s2">&quot;to_runner_api_parameter not &quot;</span>
<span class="s2">&quot;implemented for type&quot;</span><span class="p">)</span></div>
<div class="viewcode-block" id="Read.from_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.from_runner_api_parameter">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">from_runner_api_parameter</span><span class="p">(</span>
<span class="n">transform</span><span class="p">:</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PTransform</span><span class="p">,</span>
<span class="n">payload</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ReadPayload</span><span class="p">,</span>
<span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubReadPayload</span><span class="p">],</span>
<span class="n">context</span><span class="p">:</span> <span class="n">PipelineContext</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;Read&quot;</span><span class="p">:</span>
<span class="k">if</span> <span class="n">transform</span><span class="o">.</span><span class="n">spec</span><span class="o">.</span><span class="n">urn</span> <span class="o">==</span> <span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">PUBSUB_READ</span><span class="o">.</span><span class="n">urn</span><span class="p">:</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">payload</span><span class="p">,</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubReadPayload</span><span class="p">)</span>
<span class="c1"># Importing locally to prevent circular dependencies.</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.pubsub</span> <span class="kn">import</span> <span class="n">_PubSubSource</span>
<span class="n">source</span> <span class="o">=</span> <span class="n">_PubSubSource</span><span class="p">(</span>
<span class="n">topic</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">topic</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">subscription</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">subscription</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">id_label</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">id_attribute</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">with_attributes</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">with_attributes</span><span class="p">,</span>
<span class="n">timestamp_attribute</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">timestamp_attribute</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">return</span> <span class="n">Read</span><span class="p">(</span><span class="n">source</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">payload</span><span class="p">,</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ReadPayload</span><span class="p">)</span>
<span class="k">return</span> <span class="n">Read</span><span class="p">(</span><span class="n">SourceBase</span><span class="o">.</span><span class="n">from_runner_api</span><span class="p">(</span><span class="n">payload</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">context</span><span class="p">))</span></div>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">_from_runner_api_parameter_read</span><span class="p">(</span>
<span class="n">transform</span><span class="p">:</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PTransform</span><span class="p">,</span>
<span class="n">payload</span><span class="p">:</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ReadPayload</span><span class="p">,</span>
<span class="n">context</span><span class="p">:</span> <span class="n">PipelineContext</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;Read&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Method for type proxying when calling register_urn due to limitations</span>
<span class="sd"> in type exprs in Python&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">Read</span><span class="o">.</span><span class="n">from_runner_api_parameter</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="n">payload</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">_from_runner_api_parameter_pubsub_read</span><span class="p">(</span>
<span class="n">transform</span><span class="p">:</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PTransform</span><span class="p">,</span>
<span class="n">payload</span><span class="p">:</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubReadPayload</span><span class="p">,</span>
<span class="n">context</span><span class="p">:</span> <span class="n">PipelineContext</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;Read&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Method for type proxying when calling register_urn due to limitations</span>
<span class="sd"> in type exprs in Python&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">Read</span><span class="o">.</span><span class="n">from_runner_api_parameter</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="n">payload</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span></div>
<span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">deprecated_primitives</span><span class="o">.</span><span class="n">READ</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ReadPayload</span><span class="p">,</span>
<span class="n">Read</span><span class="o">.</span><span class="n">_from_runner_api_parameter_read</span><span class="p">,</span>
<span class="p">)</span>
<span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">PUBSUB_READ</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubReadPayload</span><span class="p">,</span>
<span class="n">Read</span><span class="o">.</span><span class="n">_from_runner_api_parameter_pubsub_read</span><span class="p">,</span>
<span class="p">)</span>
<div class="viewcode-block" id="Write"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Write">[docs]</a><span class="k">class</span> <span class="nc">Write</span><span class="p">(</span><span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A ``PTransform`` that writes to a sink.</span>
<span class="sd"> A sink should inherit ``iobase.Sink``. Such implementations are</span>
<span class="sd"> handled using a composite transform that consists of three ``ParDo``s -</span>
<span class="sd"> (1) a ``ParDo`` performing a global initialization (2) a ``ParDo`` performing</span>
<span class="sd"> a parallel write and (3) a ``ParDo`` performing a global finalization. In the</span>
<span class="sd"> case of an empty ``PCollection``, only the global initialization and</span>
<span class="sd"> finalization will be performed. Currently only batch workflows support custom</span>
<span class="sd"> sinks.</span>
<span class="sd"> Example usage::</span>
<span class="sd"> pcollection | beam.io.Write(MySink())</span>
<span class="sd"> This returns a ``pvalue.PValue`` object that represents the end of the</span>
<span class="sd"> Pipeline.</span>
<span class="sd"> The sink argument may also be a full PTransform, in which case it will be</span>
<span class="sd"> applied directly. This allows composite sink-like transforms (e.g. a sink</span>
<span class="sd"> with some pre-processing DoFns) to be used the same as all other sinks.</span>
<span class="sd"> This transform also supports sinks that inherit ``iobase.NativeSink``. These</span>
<span class="sd"> are sinks that are implemented natively by the Dataflow service and hence</span>
<span class="sd"> should not be updated by users. These sinks are processed using a Dataflow</span>
<span class="sd"> native write transform.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Import runners here to prevent circular imports</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.pipeline_context</span> <span class="kn">import</span> <span class="n">PipelineContext</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sink</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Initializes a Write transform.</span>
<span class="sd"> Args:</span>
<span class="sd"> sink: Data sink to write to.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink</span> <span class="o">=</span> <span class="n">sink</span>
<div class="viewcode-block" id="Write.display_data"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Write.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="s1">&#39;sink&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span> <span class="s1">&#39;sink_dd&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">}</span></div>
<div class="viewcode-block" id="Write.expand"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Write.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.dataflow.native_io</span> <span class="kn">import</span> <span class="n">iobase</span> <span class="k">as</span> <span class="n">dataflow_io</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="n">dataflow_io</span><span class="o">.</span><span class="n">NativeSink</span><span class="p">):</span>
<span class="c1"># A native sink</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="s1">&#39;NativeWrite&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">dataflow_io</span><span class="o">.</span><span class="n">_NativeWrite</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="n">Sink</span><span class="p">):</span>
<span class="c1"># A custom sink</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">WriteImpl</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="c1"># This allows &quot;composite&quot; sinks to be used like non-composite ones.</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;A sink must inherit iobase.Sink, iobase.NativeSink, &#39;</span>
<span class="s1">&#39;or be a PTransform. Received : </span><span class="si">%r</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">)</span></div>
<div class="viewcode-block" id="Write.to_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Write.to_runner_api_parameter">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">context</span><span class="p">:</span> <span class="n">PipelineContext</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span>
<span class="c1"># Importing locally to prevent circular dependencies.</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.pubsub</span> <span class="kn">import</span> <span class="n">_PubSubSink</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="n">_PubSubSink</span><span class="p">):</span>
<span class="n">payload</span> <span class="o">=</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubWritePayload</span><span class="p">(</span>
<span class="n">topic</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">full_topic</span><span class="p">,</span>
<span class="n">id_attribute</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">id_label</span><span class="p">,</span>
<span class="n">timestamp_attribute</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">timestamp_attribute</span><span class="p">)</span>
<span class="k">return</span> <span class="p">(</span><span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">PUBSUB_WRITE</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> <span class="n">payload</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="n">to_runner_api_parameter</span><span class="p">(</span><span class="n">context</span><span class="p">)</span></div>
<div class="viewcode-block" id="Write.from_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Write.from_runner_api_parameter">[docs]</a> <span class="nd">@staticmethod</span>
<span class="nd">@ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">PUBSUB_WRITE</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubWritePayload</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">from_runner_api_parameter</span><span class="p">(</span>
<span class="n">ptransform</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span>
<span class="n">payload</span><span class="p">:</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PubSubWritePayload</span><span class="p">,</span>
<span class="n">unused_context</span><span class="p">:</span> <span class="n">PipelineContext</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;Write&quot;</span><span class="p">:</span>
<span class="k">if</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">spec</span><span class="o">.</span><span class="n">urn</span> <span class="o">!=</span> <span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">PUBSUB_WRITE</span><span class="o">.</span><span class="n">urn</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;Write transform cannot be constructed for the given proto </span><span class="si">%r</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">ptransform</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">payload</span><span class="o">.</span><span class="n">topic</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span>
<span class="s2">&quot;from_runner_api_parameter does not &quot;</span>
<span class="s2">&quot;handle empty or None topic&quot;</span><span class="p">)</span>
<span class="c1"># Importing locally to prevent circular dependencies.</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.pubsub</span> <span class="kn">import</span> <span class="n">_PubSubSink</span>
<span class="n">sink</span> <span class="o">=</span> <span class="n">_PubSubSink</span><span class="p">(</span>
<span class="n">topic</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">topic</span><span class="p">,</span>
<span class="n">id_label</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">id_attribute</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">timestamp_attribute</span><span class="o">=</span><span class="n">payload</span><span class="o">.</span><span class="n">timestamp_attribute</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">return</span> <span class="n">Write</span><span class="p">(</span><span class="n">sink</span><span class="p">)</span></div></div>
<span class="k">class</span> <span class="nc">WriteImpl</span><span class="p">(</span><span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Implements the writing of custom sinks.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sink</span><span class="p">):</span>
<span class="c1"># type: (Sink) -&gt; None</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink</span> <span class="o">=</span> <span class="n">sink</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="n">do_once</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">|</span> <span class="s1">&#39;DoOnce&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span>
<span class="n">init_result_coll</span> <span class="o">=</span> <span class="n">do_once</span> <span class="o">|</span> <span class="s1">&#39;InitializeWrite&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">_</span><span class="p">,</span> <span class="n">sink</span><span class="p">:</span> <span class="n">sink</span><span class="o">.</span><span class="n">initialize_write</span><span class="p">(),</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="s1">&#39;num_shards&#39;</span><span class="p">,</span> <span class="mi">0</span><span class="p">):</span>
<span class="n">min_shards</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">num_shards</span>
<span class="k">if</span> <span class="n">min_shards</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="n">keyed_pcoll</span> <span class="o">=</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">x</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">keyed_pcoll</span> <span class="o">=</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">_RoundRobinKeyFn</span><span class="p">(),</span> <span class="n">count</span><span class="o">=</span><span class="n">min_shards</span><span class="p">)</span>
<span class="n">write_result_coll</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">keyed_pcoll</span>
<span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span>
<span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span>
<span class="o">|</span> <span class="s1">&#39;WriteBundles&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_WriteKeyedBundleDoFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">),</span> <span class="n">AsSingleton</span><span class="p">(</span><span class="n">init_result_coll</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">min_shards</span> <span class="o">=</span> <span class="mi">1</span>
<span class="n">write_result_coll</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">pcoll</span>
<span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span>
<span class="o">|</span> <span class="s1">&#39;WriteBundles&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_WriteBundleDoFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">),</span> <span class="n">AsSingleton</span><span class="p">(</span><span class="n">init_result_coll</span><span class="p">))</span>
<span class="o">|</span> <span class="s1">&#39;Pair&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">x</span><span class="p">))</span>
<span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span>
<span class="o">|</span> <span class="s1">&#39;Extract&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">]))</span>
<span class="c1"># PreFinalize should run before FinalizeWrite, and the two should not be</span>
<span class="c1"># fused.</span>
<span class="n">pre_finalize_coll</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">do_once</span>
<span class="o">|</span> <span class="s1">&#39;PreFinalize&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span>
<span class="n">_pre_finalize</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span>
<span class="n">AsSingleton</span><span class="p">(</span><span class="n">init_result_coll</span><span class="p">),</span>
<span class="n">AsIter</span><span class="p">(</span><span class="n">write_result_coll</span><span class="p">)))</span>
<span class="k">return</span> <span class="n">do_once</span> <span class="o">|</span> <span class="s1">&#39;FinalizeWrite&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span>
<span class="n">_finalize_write</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span>
<span class="n">AsSingleton</span><span class="p">(</span><span class="n">init_result_coll</span><span class="p">),</span>
<span class="n">AsIter</span><span class="p">(</span><span class="n">write_result_coll</span><span class="p">),</span>
<span class="n">min_shards</span><span class="p">,</span>
<span class="n">AsSingleton</span><span class="p">(</span><span class="n">pre_finalize_coll</span><span class="p">))</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="nb">str</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_WriteBundleDoFn</span><span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A DoFn for writing elements to an iobase.Writer.</span>
<span class="sd"> Opens a writer at the first element and closes the writer at finish_bundle().</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sink</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink</span> <span class="o">=</span> <span class="n">sink</span>
<span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="s1">&#39;sink_dd&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">}</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">init_result</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># We ignore UUID collisions here since they are extremely rare.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">open_writer</span><span class="p">(</span><span class="n">init_result</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">()))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="o">.</span><span class="n">at_capacity</span><span class="p">():</span>
<span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">WindowedValue</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">(),</span>
<span class="n">window</span><span class="o">.</span><span class="n">GlobalWindow</span><span class="p">()</span><span class="o">.</span><span class="n">max_timestamp</span><span class="p">(),</span> <span class="p">[</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindow</span><span class="p">()])</span>
<span class="k">class</span> <span class="nc">_WriteKeyedBundleDoFn</span><span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sink</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink</span> <span class="o">=</span> <span class="n">sink</span>
<span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="s1">&#39;sink_dd&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">}</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">init_result</span><span class="p">):</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="n">element</span>
<span class="n">writer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">open_writer</span><span class="p">(</span><span class="n">init_result</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">()))</span>
<span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">bundle</span><span class="p">[</span><span class="mi">1</span><span class="p">]:</span> <span class="c1"># values</span>
<span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
<span class="k">return</span> <span class="p">[</span><span class="n">window</span><span class="o">.</span><span class="n">TimestampedValue</span><span class="p">(</span><span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">(),</span> <span class="n">timestamp</span><span class="o">.</span><span class="n">MAX_TIMESTAMP</span><span class="p">)]</span>
<span class="k">def</span> <span class="nf">_pre_finalize</span><span class="p">(</span><span class="n">unused_element</span><span class="p">,</span> <span class="n">sink</span><span class="p">,</span> <span class="n">init_result</span><span class="p">,</span> <span class="n">write_results</span><span class="p">):</span>
<span class="k">return</span> <span class="n">sink</span><span class="o">.</span><span class="n">pre_finalize</span><span class="p">(</span><span class="n">init_result</span><span class="p">,</span> <span class="n">write_results</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_finalize_write</span><span class="p">(</span>
<span class="n">unused_element</span><span class="p">,</span>
<span class="n">sink</span><span class="p">,</span>
<span class="n">init_result</span><span class="p">,</span>
<span class="n">write_results</span><span class="p">,</span>
<span class="n">min_shards</span><span class="p">,</span>
<span class="n">pre_finalize_results</span><span class="p">):</span>
<span class="n">write_results</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">write_results</span><span class="p">)</span>
<span class="n">extra_shards</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">write_results</span><span class="p">)</span> <span class="o">&lt;</span> <span class="n">min_shards</span><span class="p">:</span>
<span class="k">if</span> <span class="n">write_results</span> <span class="ow">or</span> <span class="ow">not</span> <span class="n">sink</span><span class="o">.</span><span class="n">skip_if_empty</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span>
<span class="s1">&#39;Creating </span><span class="si">%s</span><span class="s1"> empty shard(s).&#39;</span><span class="p">,</span> <span class="n">min_shards</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="n">write_results</span><span class="p">))</span>
<span class="k">for</span> <span class="n">_</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">min_shards</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="n">write_results</span><span class="p">)):</span>
<span class="n">writer</span> <span class="o">=</span> <span class="n">sink</span><span class="o">.</span><span class="n">open_writer</span><span class="p">(</span><span class="n">init_result</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">()))</span>
<span class="n">extra_shards</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">())</span>
<span class="n">outputs</span> <span class="o">=</span> <span class="n">sink</span><span class="o">.</span><span class="n">finalize_write</span><span class="p">(</span>
<span class="n">init_result</span><span class="p">,</span> <span class="n">write_results</span> <span class="o">+</span> <span class="n">extra_shards</span><span class="p">,</span> <span class="n">pre_finalize_results</span><span class="p">)</span>
<span class="k">if</span> <span class="n">outputs</span><span class="p">:</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">window</span><span class="o">.</span><span class="n">TimestampedValue</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">.</span><span class="n">MAX_TIMESTAMP</span><span class="p">)</span> <span class="k">for</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">outputs</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_RoundRobinKeyFn</span><span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">counter</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">count</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">counter</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">counter</span> <span class="o">=</span> <span class="n">random</span><span class="o">.</span><span class="n">randrange</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="n">count</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">counter</span> <span class="o">=</span> <span class="p">(</span><span class="mi">1</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">counter</span><span class="p">)</span> <span class="o">%</span> <span class="n">count</span>
<span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">counter</span><span class="p">,</span> <span class="n">element</span>
<div class="viewcode-block" id="RestrictionTracker"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker">[docs]</a><span class="k">class</span> <span class="nc">RestrictionTracker</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Manages access to a restriction.</span>
<span class="sd"> Keeps track of the restrictions claimed part for a Splittable DoFn.</span>
<span class="sd"> The restriction may be modified by different threads, however the system will</span>
<span class="sd"> ensure sufficient locking such that no methods on the restriction tracker</span>
<span class="sd"> will be called concurrently.</span>
<span class="sd"> See following documents for more details.</span>
<span class="sd"> * https://s.apache.org/splittable-do-fn</span>
<span class="sd"> * https://s.apache.org/splittable-do-fn-python-sdk</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="RestrictionTracker.current_restriction"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.current_restriction">[docs]</a> <span class="k">def</span> <span class="nf">current_restriction</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns the current restriction.</span>
<span class="sd"> Returns a restriction accurately describing the full range of work the</span>
<span class="sd"> current ``DoFn.process()`` call will do, including already completed work.</span>
<span class="sd"> The current restriction returned by method may be updated dynamically due</span>
<span class="sd"> to due to concurrent invocation of other methods of the</span>
<span class="sd"> ``RestrictionTracker``, For example, ``split()``.</span>
<span class="sd"> This API is required to be implemented.</span>
<span class="sd"> Returns: a restriction object.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RestrictionTracker.current_progress"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.current_progress">[docs]</a> <span class="k">def</span> <span class="nf">current_progress</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; RestrictionProgress</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns a RestrictionProgress object representing the current progress.</span>
<span class="sd"> This API is recommended to be implemented. The runner can do a better job</span>
<span class="sd"> at parallel processing with better progress signals.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RestrictionTracker.check_done"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.check_done">[docs]</a> <span class="k">def</span> <span class="nf">check_done</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Checks whether the restriction has been fully processed.</span>
<span class="sd"> Called by the SDK harness after iterator returned by ``DoFn.process()``</span>
<span class="sd"> has been fully read.</span>
<span class="sd"> This method must raise a `ValueError` if there is still any unclaimed work</span>
<span class="sd"> remaining in the restriction when this method is invoked. Exception raised</span>
<span class="sd"> must have an informative error message.</span>
<span class="sd"> This API is required to be implemented in order to make sure no data loss</span>
<span class="sd"> during SDK processing.</span>
<span class="sd"> Returns: ``True`` if current restriction has been fully processed.</span>
<span class="sd"> Raises:</span>
<span class="sd"> ValueError: if there is still any unclaimed work remaining.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RestrictionTracker.try_split"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.try_split">[docs]</a> <span class="k">def</span> <span class="nf">try_split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fraction_of_remainder</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Splits current restriction based on fraction_of_remainder.</span>
<span class="sd"> If splitting the current restriction is possible, the current restriction is</span>
<span class="sd"> split into a primary and residual restriction pair. This invocation updates</span>
<span class="sd"> the ``current_restriction()`` to be the primary restriction effectively</span>
<span class="sd"> having the current ``DoFn.process()`` execution responsible for performing</span>
<span class="sd"> the work that the primary restriction represents. The residual restriction</span>
<span class="sd"> will be executed in a separate ``DoFn.process()`` invocation (likely in a</span>
<span class="sd"> different process). The work performed by executing the primary and residual</span>
<span class="sd"> restrictions as separate ``DoFn.process()`` invocations MUST be equivalent</span>
<span class="sd"> to the work performed as if this split never occurred.</span>
<span class="sd"> The ``fraction_of_remainder`` should be used in a best effort manner to</span>
<span class="sd"> choose a primary and residual restriction based upon the fraction of the</span>
<span class="sd"> remaining work that the current ``DoFn.process()`` invocation is responsible</span>
<span class="sd"> for. For example, if a ``DoFn.process()`` was reading a file with a</span>
<span class="sd"> restriction representing the offset range [100, 200) and has processed up to</span>
<span class="sd"> offset 130 with a fraction_of_remainder of 0.7, the primary and residual</span>
<span class="sd"> restrictions returned would be [100, 179), [179, 200) (note: current_offset</span>
<span class="sd"> + fraction_of_remainder * remaining_work = 130 + 0.7 * 70 = 179).</span>
<span class="sd"> ``fraction_of_remainder`` = 0 means a checkpoint is required.</span>
<span class="sd"> The API is recommended to be implemented for batch pipeline given that it is</span>
<span class="sd"> very important for pipeline scaling and end to end pipeline execution.</span>
<span class="sd"> The API is required to be implemented for a streaming pipeline.</span>
<span class="sd"> Args:</span>
<span class="sd"> fraction_of_remainder: A hint as to the fraction of work the primary</span>
<span class="sd"> restriction should represent based upon the current known remaining</span>
<span class="sd"> amount of work.</span>
<span class="sd"> Returns:</span>
<span class="sd"> (primary_restriction, residual_restriction) if a split was possible,</span>
<span class="sd"> otherwise returns ``None``.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RestrictionTracker.try_claim"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.try_claim">[docs]</a> <span class="k">def</span> <span class="nf">try_claim</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">position</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Attempts to claim the block of work in the current restriction</span>
<span class="sd"> identified by the given position. Each claimed position MUST be a valid</span>
<span class="sd"> split point.</span>
<span class="sd"> If this succeeds, the DoFn MUST execute the entire block of work. If it</span>
<span class="sd"> fails, the ``DoFn.process()`` MUST return ``None`` without performing any</span>
<span class="sd"> additional work or emitting output (note that emitting output or performing</span>
<span class="sd"> work from ``DoFn.process()`` is also not allowed before the first call of</span>
<span class="sd"> this method).</span>
<span class="sd"> The API is required to be implemented.</span>
<span class="sd"> Args:</span>
<span class="sd"> position: current position that wants to be claimed.</span>
<span class="sd"> Returns: ``True`` if the position can be claimed as current_position.</span>
<span class="sd"> Otherwise, returns ``False``.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RestrictionTracker.is_bounded"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.is_bounded">[docs]</a> <span class="k">def</span> <span class="nf">is_bounded</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns whether the amount of work represented by the current restriction</span>
<span class="sd"> is bounded.</span>
<span class="sd"> The boundedness of the restriction is used to determine the default behavior</span>
<span class="sd"> of how to truncate restrictions when a pipeline is being</span>
<span class="sd"> `drained &lt;https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#&gt;`_. # pylint: disable=line-too-long</span>
<span class="sd"> If the restriction is bounded, then the entire restriction will be processed</span>
<span class="sd"> otherwise the restriction will be processed till a checkpoint is possible.</span>
<span class="sd"> The API is required to be implemented.</span>
<span class="sd"> Returns: ``True`` if the restriction represents a finite amount of work.</span>
<span class="sd"> Otherwise, returns ``False``.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div>
<div class="viewcode-block" id="WatermarkEstimator"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.WatermarkEstimator">[docs]</a><span class="k">class</span> <span class="nc">WatermarkEstimator</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A WatermarkEstimator which is used for estimating output_watermark based on</span>
<span class="sd"> the timestamp of output records or manual modifications. Please refer to</span>
<span class="sd"> ``watermark_estiamtors`` for commonly used watermark estimators.</span>
<span class="sd"> The base class provides common APIs that are called by the framework, which</span>
<span class="sd"> are also accessible inside a DoFn.process() body. Derived watermark estimator</span>
<span class="sd"> should implement all APIs listed below. Additional methods can be implemented</span>
<span class="sd"> and will be available when invoked within a DoFn.</span>
<span class="sd"> Internal state must not be updated asynchronously.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="WatermarkEstimator.get_estimator_state"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.WatermarkEstimator.get_estimator_state">[docs]</a> <span class="k">def</span> <span class="nf">get_estimator_state</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Get current state of the WatermarkEstimator instance, which can be used</span>
<span class="sd"> to recreate the WatermarkEstimator when processing the restriction. See</span>
<span class="sd"> WatermarkEstimatorProvider.create_watermark_estimator.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div>
<div class="viewcode-block" id="WatermarkEstimator.current_watermark"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.WatermarkEstimator.current_watermark">[docs]</a> <span class="k">def</span> <span class="nf">current_watermark</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; timestamp.Timestamp</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Return estimated output_watermark. This function must return</span>
<span class="sd"> monotonically increasing watermarks.&quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div>
<div class="viewcode-block" id="WatermarkEstimator.observe_timestamp"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.WatermarkEstimator.observe_timestamp">[docs]</a> <span class="k">def</span> <span class="nf">observe_timestamp</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">):</span>
<span class="c1"># type: (timestamp.Timestamp) -&gt; None</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Update tracking watermark with latest output timestamp.</span>
<span class="sd"> Args:</span>
<span class="sd"> timestamp: the `timestamp.Timestamp` of current output element.</span>
<span class="sd"> This is called with the timestamp of every element output from the DoFn.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div></div>
<div class="viewcode-block" id="RestrictionProgress"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionProgress">[docs]</a><span class="k">class</span> <span class="nc">RestrictionProgress</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Used to record the progress of a restriction.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="c1"># Only accept keyword arguments.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">&#39;fraction&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_completed</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">&#39;completed&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">&#39;remaining&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">kwargs</span>
<span class="k">def</span> <span class="fm">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s1">&#39;RestrictionProgress(fraction=</span><span class="si">%s</span><span class="s1">, completed=</span><span class="si">%s</span><span class="s1">, remaining=</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_completed</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span><span class="p">)</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">completed_work</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; float</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_completed</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_completed</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span> <span class="o">*</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> <span class="o">/</span> <span class="p">(</span><span class="mi">1</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">remaining_work</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; float</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_completed</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_completed</span> <span class="o">*</span> <span class="p">(</span><span class="mi">1</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span><span class="p">)</span> <span class="o">/</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="mi">1</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">total_work</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; float</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">completed_work</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">remaining_work</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">fraction_completed</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; float</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">float</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_completed</span><span class="p">)</span> <span class="o">/</span> <span class="bp">self</span><span class="o">.</span><span class="n">total_work</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">fraction_remaining</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; float</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="mi">1</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">float</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span><span class="p">)</span> <span class="o">/</span> <span class="bp">self</span><span class="o">.</span><span class="n">total_work</span>
<div class="viewcode-block" id="RestrictionProgress.with_completed"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionProgress.with_completed">[docs]</a> <span class="k">def</span> <span class="nf">with_completed</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">completed</span><span class="p">):</span>
<span class="c1"># type: (int) -&gt; RestrictionProgress</span>
<span class="k">return</span> <span class="n">RestrictionProgress</span><span class="p">(</span>
<span class="n">fraction</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_fraction</span><span class="p">,</span> <span class="n">remaining</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_remaining</span><span class="p">,</span> <span class="n">completed</span><span class="o">=</span><span class="n">completed</span><span class="p">)</span></div></div>
<span class="k">class</span> <span class="nc">_SDFBoundedSourceRestriction</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot; A restriction wraps SourceBundle and RangeTracker. &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source_bundle</span><span class="p">,</span> <span class="n">range_tracker</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span> <span class="o">=</span> <span class="n">source_bundle</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_range_tracker</span> <span class="o">=</span> <span class="n">range_tracker</span>
<span class="k">def</span> <span class="nf">__reduce__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># The instance of RangeTracker shouldn&#39;t be serialized.</span>
<span class="k">return</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="p">,</span> <span class="p">))</span>
<span class="k">def</span> <span class="nf">range_tracker</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_range_tracker</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_range_tracker</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">get_range_tracker</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">start_position</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">stop_position</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_range_tracker</span>
<span class="k">def</span> <span class="nf">weight</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">weight</span>
<span class="k">def</span> <span class="nf">source</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">source</span>
<span class="k">def</span> <span class="nf">try_split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fraction_of_remainder</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">consumed_fraction</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">fraction_consumed</span><span class="p">()</span>
<span class="n">fraction</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">consumed_fraction</span> <span class="o">+</span> <span class="p">(</span><span class="mi">1</span> <span class="o">-</span> <span class="n">consumed_fraction</span><span class="p">)</span> <span class="o">*</span> <span class="n">fraction_of_remainder</span><span class="p">)</span>
<span class="n">position</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">position_at_fraction</span><span class="p">(</span><span class="n">fraction</span><span class="p">)</span>
<span class="c1"># Need to stash current stop_pos before splitting since</span>
<span class="c1"># range_tracker.split will update its stop_pos if splits</span>
<span class="c1"># successfully.</span>
<span class="n">stop_pos</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">stop_position</span>
<span class="n">split_result</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">try_split</span><span class="p">(</span><span class="n">position</span><span class="p">)</span>
<span class="k">if</span> <span class="n">split_result</span><span class="p">:</span>
<span class="n">split_pos</span><span class="p">,</span> <span class="n">split_fraction</span> <span class="o">=</span> <span class="n">split_result</span>
<span class="n">primary_weight</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">weight</span> <span class="o">*</span> <span class="n">split_fraction</span>
<span class="n">residual_weight</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">weight</span> <span class="o">-</span> <span class="n">primary_weight</span>
<span class="c1"># Update self to primary weight and end position.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span> <span class="o">=</span> <span class="n">SourceBundle</span><span class="p">(</span>
<span class="n">primary_weight</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">source</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">start_position</span><span class="p">,</span>
<span class="n">split_pos</span><span class="p">)</span>
<span class="k">return</span> <span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">_SDFBoundedSourceRestriction</span><span class="p">(</span>
<span class="n">SourceBundle</span><span class="p">(</span>
<span class="n">residual_weight</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">source</span><span class="p">,</span>
<span class="n">split_pos</span><span class="p">,</span>
<span class="n">stop_pos</span><span class="p">)))</span>
<span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span>
<span class="c1"># For any exceptions from underlying trySplit calls, the wrapper will</span>
<span class="c1"># think that the source refuses to split at this point. In this case,</span>
<span class="c1"># no split happens at the wrapper level.</span>
<span class="k">return</span> <span class="kc">None</span>
<span class="k">class</span> <span class="nc">_SDFBoundedSourceRestrictionTracker</span><span class="p">(</span><span class="n">RestrictionTracker</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;An `iobase.RestrictionTracker` implementations for wrapping BoundedSource</span>
<span class="sd"> with SDF. The tracked restriction is a _SDFBoundedSourceRestriction, which</span>
<span class="sd"> wraps SourceBundle and RangeTracker.</span>
<span class="sd"> Delegated RangeTracker guarantees synchronization safety.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">restriction</span><span class="p">,</span> <span class="n">_SDFBoundedSourceRestriction</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;Initializing SDFBoundedSourceRestrictionTracker&#39;</span>
<span class="s1">&#39; requires a _SDFBoundedSourceRestriction. Got </span><span class="si">%s</span><span class="s1"> instead.&#39;</span> <span class="o">%</span>
<span class="n">restriction</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">restriction</span> <span class="o">=</span> <span class="n">restriction</span>
<span class="k">def</span> <span class="nf">current_progress</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; RestrictionProgress</span>
<span class="k">return</span> <span class="n">RestrictionProgress</span><span class="p">(</span>
<span class="n">fraction</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">fraction_consumed</span><span class="p">())</span>
<span class="k">def</span> <span class="nf">current_restriction</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span>
<span class="k">def</span> <span class="nf">start_pos</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">start_position</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">stop_pos</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">stop_position</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">try_claim</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">position</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">try_claim</span><span class="p">(</span><span class="n">position</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">try_split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fraction_of_remainder</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">try_split</span><span class="p">(</span><span class="n">fraction_of_remainder</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">check_done</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">()</span><span class="o">.</span><span class="n">fraction_consumed</span><span class="p">()</span> <span class="o">&gt;=</span> <span class="mf">1.0</span>
<span class="k">def</span> <span class="nf">is_bounded</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">class</span> <span class="nc">_SDFBoundedSourceWrapperRestrictionCoder</span><span class="p">(</span><span class="n">coders</span><span class="o">.</span><span class="n">Coder</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">decode</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="k">return</span> <span class="n">_SDFBoundedSourceRestriction</span><span class="p">(</span><span class="n">SourceBundle</span><span class="p">(</span><span class="o">*</span><span class="n">pickler</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">value</span><span class="p">)))</span>
<span class="k">def</span> <span class="nf">encode</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span>
<span class="k">return</span> <span class="n">pickler</span><span class="o">.</span><span class="n">dumps</span><span class="p">((</span>
<span class="n">restriction</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">weight</span><span class="p">,</span>
<span class="n">restriction</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">source</span><span class="p">,</span>
<span class="n">restriction</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">start_position</span><span class="p">,</span>
<span class="n">restriction</span><span class="o">.</span><span class="n">_source_bundle</span><span class="o">.</span><span class="n">stop_position</span><span class="p">))</span>
<span class="k">class</span> <span class="nc">_SDFBoundedSourceRestrictionProvider</span><span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">RestrictionProvider</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A `RestrictionProvider` that is used by SDF for `BoundedSource`.</span>
<span class="sd"> This restriction provider initializes restriction based on input</span>
<span class="sd"> element that is expected to be of BoundedSource type.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">desired_chunk_size</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">restriction_coder</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_desired_chunk_size</span> <span class="o">=</span> <span class="n">desired_chunk_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_restriction_coder</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">restriction_coder</span> <span class="ow">or</span> <span class="n">_SDFBoundedSourceWrapperRestrictionCoder</span><span class="p">())</span>
<span class="k">def</span> <span class="nf">_check_source</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">src</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">src</span><span class="p">,</span> <span class="n">BoundedSource</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span>
<span class="s1">&#39;SDFBoundedSourceRestrictionProvider can only utilize BoundedSource&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">initial_restriction</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element_source</span><span class="p">:</span> <span class="n">BoundedSource</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_check_source</span><span class="p">(</span><span class="n">element_source</span><span class="p">)</span>
<span class="n">range_tracker</span> <span class="o">=</span> <span class="n">element_source</span><span class="o">.</span><span class="n">get_range_tracker</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_SDFBoundedSourceRestriction</span><span class="p">(</span>
<span class="n">SourceBundle</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span>
<span class="n">element_source</span><span class="p">,</span>
<span class="n">range_tracker</span><span class="o">.</span><span class="n">start_position</span><span class="p">(),</span>
<span class="n">range_tracker</span><span class="o">.</span><span class="n">stop_position</span><span class="p">()))</span>
<span class="k">def</span> <span class="nf">create_tracker</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span>
<span class="k">return</span> <span class="n">_SDFBoundedSourceRestrictionTracker</span><span class="p">(</span><span class="n">restriction</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_desired_chunk_size</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">estimated_size</span> <span class="o">=</span> <span class="n">restriction</span><span class="o">.</span><span class="n">source</span><span class="p">()</span><span class="o">.</span><span class="n">estimate_size</span><span class="p">()</span>
<span class="k">except</span> <span class="ne">NotImplementedError</span><span class="p">:</span>
<span class="n">estimated_size</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_desired_chunk_size</span> <span class="o">=</span> <span class="n">Read</span><span class="o">.</span><span class="n">get_desired_chunk_size</span><span class="p">(</span><span class="n">estimated_size</span><span class="p">)</span>
<span class="c1"># Invoke source.split to get initial splitting results.</span>
<span class="n">source_bundles</span> <span class="o">=</span> <span class="n">restriction</span><span class="o">.</span><span class="n">source</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_desired_chunk_size</span><span class="p">)</span>
<span class="k">for</span> <span class="n">source_bundle</span> <span class="ow">in</span> <span class="n">source_bundles</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">_SDFBoundedSourceRestriction</span><span class="p">(</span><span class="n">source_bundle</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">restriction_size</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span>
<span class="k">return</span> <span class="n">restriction</span><span class="o">.</span><span class="n">weight</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">restriction_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_restriction_coder</span>
<span class="k">class</span> <span class="nc">SDFBoundedSourceReader</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A ``PTransform`` that uses SDF to read from each ``BoundedSource`` in a</span>
<span class="sd"> PCollection.</span>
<span class="sd"> NOTE: This transform can only be used with beam_fn_api enabled.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">data_to_display</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_data_to_display</span> <span class="o">=</span> <span class="n">data_to_display</span> <span class="ow">or</span> <span class="p">{}</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">_create_sdf_bounded_source_dofn</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">class</span> <span class="nc">SDFBoundedSourceDoFn</span><span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dd</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_dd</span> <span class="o">=</span> <span class="n">dd</span>
<span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dd</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">unused_element</span><span class="p">,</span>
<span class="n">restriction_tracker</span><span class="o">=</span><span class="n">core</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">RestrictionParam</span><span class="p">(</span>
<span class="n">_SDFBoundedSourceRestrictionProvider</span><span class="p">())):</span>
<span class="n">current_restriction</span> <span class="o">=</span> <span class="n">restriction_tracker</span><span class="o">.</span><span class="n">current_restriction</span><span class="p">()</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">current_restriction</span><span class="p">,</span> <span class="n">_SDFBoundedSourceRestriction</span><span class="p">)</span>
<span class="k">return</span> <span class="n">current_restriction</span><span class="o">.</span><span class="n">source</span><span class="p">()</span><span class="o">.</span><span class="n">read</span><span class="p">(</span>
<span class="n">current_restriction</span><span class="o">.</span><span class="n">range_tracker</span><span class="p">())</span>
<span class="k">return</span> <span class="n">SDFBoundedSourceDoFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_data_to_display</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pvalue</span><span class="p">):</span>
<span class="k">return</span> <span class="n">pvalue</span> <span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_create_sdf_bounded_source_dofn</span><span class="p">())</span>
<span class="k">def</span> <span class="nf">get_windowing</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_inputs</span><span class="p">):</span>
<span class="k">return</span> <span class="n">core</span><span class="o">.</span><span class="n">Windowing</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span>
<span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_data_to_display</span>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>