blob: 1cdd7473720c34e8a9365839f52009bfee613fcc [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.io.iobase &mdash; Apache Beam documentation</title>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="index" title="Index"
href="../../../genindex.html"/>
<link rel="search" title="Search" href="../../../search.html"/>
<link rel="top" title="Apache Beam documentation" href="../../../index.html"/>
<link rel="up" title="Module code" href="../../index.html"/>
<script src="../../../_static/js/modernizr.min.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search">
<a href="../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.internal.html">apache_beam.internal package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.tools.html">apache_beam.tools package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.version.html">apache_beam.version module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.io.iobase</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.io.iobase</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Sources and sinks.</span>
<span class="sd">A Source manages record-oriented data input from a particular kind of source</span>
<span class="sd">(e.g. a set of files, a database table, etc.). The reader() method of a source</span>
<span class="sd">returns a reader object supporting the iterator protocol; iteration yields</span>
<span class="sd">raw records of unprocessed, serialized data.</span>
<span class="sd">A Sink manages record-oriented data output to a particular kind of sink</span>
<span class="sd">(e.g. a set of files, a database table, etc.). The writer() method of a sink</span>
<span class="sd">returns a writer object supporting writing records of serialized data to</span>
<span class="sd">the sink.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">absolute_import</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">math</span>
<span class="kn">import</span> <span class="nn">random</span>
<span class="kn">import</span> <span class="nn">uuid</span>
<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">object</span>
<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">range</span>
<span class="kn">from</span> <span class="nn">collections</span> <span class="k">import</span> <span class="n">namedtuple</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="k">import</span> <span class="n">coders</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="k">import</span> <span class="n">pvalue</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability</span> <span class="k">import</span> <span class="n">common_urns</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability</span> <span class="k">import</span> <span class="n">python_urns</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability.api</span> <span class="k">import</span> <span class="n">beam_runner_api_pb2</span>
<span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="k">import</span> <span class="n">AsIter</span>
<span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="k">import</span> <span class="n">AsSingleton</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="k">import</span> <span class="n">core</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="k">import</span> <span class="n">ptransform</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="k">import</span> <span class="n">window</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="k">import</span> <span class="n">DisplayDataItem</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="k">import</span> <span class="n">HasDisplayData</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils</span> <span class="k">import</span> <span class="n">urns</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils.windowed_value</span> <span class="k">import</span> <span class="n">WindowedValue</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;BoundedSource&#39;</span><span class="p">,</span> <span class="s1">&#39;RangeTracker&#39;</span><span class="p">,</span> <span class="s1">&#39;Read&#39;</span><span class="p">,</span> <span class="s1">&#39;RestrictionTracker&#39;</span><span class="p">,</span>
<span class="s1">&#39;Sink&#39;</span><span class="p">,</span> <span class="s1">&#39;Write&#39;</span><span class="p">,</span> <span class="s1">&#39;Writer&#39;</span><span class="p">]</span>
<span class="c1"># Encapsulates information about a bundle of a source generated when method</span>
<span class="c1"># BoundedSource.split() is invoked.</span>
<span class="c1"># This is a named 4-tuple that has following fields.</span>
<span class="c1"># * weight - a number that represents the size of the bundle. This value will</span>
<span class="c1"># be used to compare the relative sizes of bundles generated by the</span>
<span class="c1"># current source.</span>
<span class="c1"># The weight returned here could be specified using a unit of your</span>
<span class="c1"># choice (for example, bundles of sizes 100MB, 200MB, and 700MB may</span>
<span class="c1"># specify weights 100, 200, 700 or 1, 2, 7) but all bundles of a</span>
<span class="c1"># source should specify the weight using the same unit.</span>
<span class="c1"># * source - a BoundedSource object for the bundle.</span>
<span class="c1"># * start_position - starting position of the bundle</span>
<span class="c1"># * stop_position - ending position of the bundle.</span>
<span class="c1">#</span>
<span class="c1"># Type for start and stop positions are specific to the bounded source and must</span>
<span class="c1"># be consistent throughout.</span>
<span class="n">SourceBundle</span> <span class="o">=</span> <span class="n">namedtuple</span><span class="p">(</span>
<span class="s1">&#39;SourceBundle&#39;</span><span class="p">,</span>
<span class="s1">&#39;weight source start_position stop_position&#39;</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">SourceBase</span><span class="p">(</span><span class="n">HasDisplayData</span><span class="p">,</span> <span class="n">urns</span><span class="o">.</span><span class="n">RunnerApiFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Base class for all sources that can be passed to beam.io.Read(...).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">urns</span><span class="o">.</span><span class="n">RunnerApiFn</span><span class="o">.</span><span class="n">register_pickle_urn</span><span class="p">(</span><span class="n">python_urns</span><span class="o">.</span><span class="n">PICKLED_SOURCE</span><span class="p">)</span>
<div class="viewcode-block" id="BoundedSource"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource">[docs]</a><span class="k">class</span> <span class="nc">BoundedSource</span><span class="p">(</span><span class="n">SourceBase</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A source that reads a finite amount of input records.</span>
<span class="sd"> This class defines following operations which can be used to read the source</span>
<span class="sd"> efficiently.</span>
<span class="sd"> * Size estimation - method ``estimate_size()`` may return an accurate</span>
<span class="sd"> estimation in bytes for the size of the source.</span>
<span class="sd"> * Splitting into bundles of a given size - method ``split()`` can be used to</span>
<span class="sd"> split the source into a set of sub-sources (bundles) based on a desired</span>
<span class="sd"> bundle size.</span>
<span class="sd"> * Getting a RangeTracker - method ``get_range_tracker()`` should return a</span>
<span class="sd"> ``RangeTracker`` object for a given position range for the position type</span>
<span class="sd"> of the records returned by the source.</span>
<span class="sd"> * Reading the data - method ``read()`` can be used to read data from the</span>
<span class="sd"> source while respecting the boundaries defined by a given</span>
<span class="sd"> ``RangeTracker``.</span>
<span class="sd"> A runner will perform reading the source in two steps.</span>
<span class="sd"> (1) Method ``get_range_tracker()`` will be invoked with start and end</span>
<span class="sd"> positions to obtain a ``RangeTracker`` for the range of positions the</span>
<span class="sd"> runner intends to read. Source must define a default initial start and end</span>
<span class="sd"> position range. These positions must be used if the start and/or end</span>
<span class="sd"> positions passed to the method ``get_range_tracker()`` are ``None``</span>
<span class="sd"> (2) Method read() will be invoked with the ``RangeTracker`` obtained in the</span>
<span class="sd"> previous step.</span>
<span class="sd"> **Mutability**</span>
<span class="sd"> A ``BoundedSource`` object should not be mutated while</span>
<span class="sd"> its methods (for example, ``read()``) are being invoked by a runner. Runner</span>
<span class="sd"> implementations may invoke methods of ``BoundedSource`` objects through</span>
<span class="sd"> multi-threaded and/or reentrant execution modes.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="BoundedSource.estimate_size"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.estimate_size">[docs]</a> <span class="k">def</span> <span class="nf">estimate_size</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Estimates the size of source in bytes.</span>
<span class="sd"> An estimate of the total size (in bytes) of the data that would be read</span>
<span class="sd"> from this source. This estimate is in terms of external storage size,</span>
<span class="sd"> before performing decompression or other processing.</span>
<span class="sd"> Returns:</span>
<span class="sd"> estimated size of the source if the size can be determined, ``None``</span>
<span class="sd"> otherwise.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="BoundedSource.split"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.split">[docs]</a> <span class="k">def</span> <span class="nf">split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">desired_bundle_size</span><span class="p">,</span> <span class="n">start_position</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">stop_position</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Splits the source into a set of bundles.</span>
<span class="sd"> Bundles should be approximately of size ``desired_bundle_size`` bytes.</span>
<span class="sd"> Args:</span>
<span class="sd"> desired_bundle_size: the desired size (in bytes) of the bundles returned.</span>
<span class="sd"> start_position: if specified the given position must be used as the</span>
<span class="sd"> starting position of the first bundle.</span>
<span class="sd"> stop_position: if specified the given position must be used as the ending</span>
<span class="sd"> position of the last bundle.</span>
<span class="sd"> Returns:</span>
<span class="sd"> an iterator of objects of type &#39;SourceBundle&#39; that gives information about</span>
<span class="sd"> the generated bundles.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="BoundedSource.get_range_tracker"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.get_range_tracker">[docs]</a> <span class="k">def</span> <span class="nf">get_range_tracker</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">start_position</span><span class="p">,</span> <span class="n">stop_position</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns a RangeTracker for a given position range.</span>
<span class="sd"> Framework may invoke ``read()`` method with the RangeTracker object returned</span>
<span class="sd"> here to read data from the source.</span>
<span class="sd"> Args:</span>
<span class="sd"> start_position: starting position of the range. If &#39;None&#39; default start</span>
<span class="sd"> position of the source must be used.</span>
<span class="sd"> stop_position: ending position of the range. If &#39;None&#39; default stop</span>
<span class="sd"> position of the source must be used.</span>
<span class="sd"> Returns:</span>
<span class="sd"> a ``RangeTracker`` for the given position range.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="BoundedSource.read"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.read">[docs]</a> <span class="k">def</span> <span class="nf">read</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">range_tracker</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns an iterator that reads data from the source.</span>
<span class="sd"> The returned set of data must respect the boundaries defined by the given</span>
<span class="sd"> ``RangeTracker`` object. For example:</span>
<span class="sd"> * Returned set of data must be for the range</span>
<span class="sd"> ``[range_tracker.start_position, range_tracker.stop_position)``. Note</span>
<span class="sd"> that a source may decide to return records that start after</span>
<span class="sd"> ``range_tracker.stop_position``. See documentation in class</span>
<span class="sd"> ``RangeTracker`` for more details. Also, note that framework might</span>
<span class="sd"> invoke ``range_tracker.try_split()`` to perform dynamic split</span>
<span class="sd"> operations. range_tracker.stop_position may be updated</span>
<span class="sd"> dynamically due to successful dynamic split operations.</span>
<span class="sd"> * Method ``range_tracker.try_split()`` must be invoked for every record</span>
<span class="sd"> that starts at a split point.</span>
<span class="sd"> * Method ``range_tracker.record_current_position()`` may be invoked for</span>
<span class="sd"> records that do not start at split points.</span>
<span class="sd"> Args:</span>
<span class="sd"> range_tracker: a ``RangeTracker`` whose boundaries must be respected</span>
<span class="sd"> when reading data from the source. A runner that reads this</span>
<span class="sd"> source muss pass a ``RangeTracker`` object that is not</span>
<span class="sd"> ``None``.</span>
<span class="sd"> Returns:</span>
<span class="sd"> an iterator of data read by the source.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="BoundedSource.default_output_coder"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.default_output_coder">[docs]</a> <span class="k">def</span> <span class="nf">default_output_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Coder that should be used for the records returned by the source.</span>
<span class="sd"> Should be overridden by sources that produce objects that can be encoded</span>
<span class="sd"> more efficiently than pickling.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="nb">object</span><span class="p">)</span></div>
<div class="viewcode-block" id="BoundedSource.is_bounded"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.BoundedSource.is_bounded">[docs]</a> <span class="k">def</span> <span class="nf">is_bounded</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="kc">True</span></div></div>
<div class="viewcode-block" id="RangeTracker"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker">[docs]</a><span class="k">class</span> <span class="nc">RangeTracker</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A thread safe object used by Dataflow source framework.</span>
<span class="sd"> A Dataflow source is defined using a &#39;&#39;BoundedSource&#39;&#39; and a &#39;&#39;RangeTracker&#39;&#39;</span>
<span class="sd"> pair. A &#39;&#39;RangeTracker&#39;&#39; is used by Dataflow source framework to perform</span>
<span class="sd"> dynamic work rebalancing of position-based sources.</span>
<span class="sd"> **Position-based sources**</span>
<span class="sd"> A position-based source is one where the source can be described by a range</span>
<span class="sd"> of positions of an ordered type and the records returned by the reader can be</span>
<span class="sd"> described by positions of the same type.</span>
<span class="sd"> In case a record occupies a range of positions in the source, the most</span>
<span class="sd"> important thing about the record is the position where it starts.</span>
<span class="sd"> Defining the semantics of positions for a source is entirely up to the source</span>
<span class="sd"> class, however the chosen definitions have to obey certain properties in order</span>
<span class="sd"> to make it possible to correctly split the source into parts, including</span>
<span class="sd"> dynamic splitting. Two main aspects need to be defined:</span>
<span class="sd"> 1. How to assign starting positions to records.</span>
<span class="sd"> 2. Which records should be read by a source with a range &#39;[A, B)&#39;.</span>
<span class="sd"> Moreover, reading a range must be *efficient*, i.e., the performance of</span>
<span class="sd"> reading a range should not significantly depend on the location of the range.</span>
<span class="sd"> For example, reading the range [A, B) should not require reading all data</span>
<span class="sd"> before &#39;A&#39;.</span>
<span class="sd"> The sections below explain exactly what properties these definitions must</span>
<span class="sd"> satisfy, and how to use a ``RangeTracker`` with a properly defined source.</span>
<span class="sd"> **Properties of position-based sources**</span>
<span class="sd"> The main requirement for position-based sources is *associativity*: reading</span>
<span class="sd"> records from &#39;[A, B)&#39; and records from &#39;[B, C)&#39; should give the same</span>
<span class="sd"> records as reading from &#39;[A, C)&#39;, where &#39;A &lt;= B &lt;= C&#39;. This property</span>
<span class="sd"> ensures that no matter how a range of positions is split into arbitrarily many</span>
<span class="sd"> sub-ranges, the total set of records described by them stays the same.</span>
<span class="sd"> The other important property is how the source&#39;s range relates to positions of</span>
<span class="sd"> records in the source. In many sources each record can be identified by a</span>
<span class="sd"> unique starting position. In this case:</span>
<span class="sd"> * All records returned by a source &#39;[A, B)&#39; must have starting positions in</span>
<span class="sd"> this range.</span>
<span class="sd"> * All but the last record should end within this range. The last record may or</span>
<span class="sd"> may not extend past the end of the range.</span>
<span class="sd"> * Records should not overlap.</span>
<span class="sd"> Such sources should define &quot;read &#39;[A, B)&#39;&quot; as &quot;read from the first record</span>
<span class="sd"> starting at or after &#39;A&#39;, up to but not including the first record starting</span>
<span class="sd"> at or after &#39;B&#39;&quot;.</span>
<span class="sd"> Some examples of such sources include reading lines or CSV from a text file,</span>
<span class="sd"> reading keys and values from a BigTable, etc.</span>
<span class="sd"> The concept of *split points* allows to extend the definitions for dealing</span>
<span class="sd"> with sources where some records cannot be identified by a unique starting</span>
<span class="sd"> position.</span>
<span class="sd"> In all cases, all records returned by a source &#39;[A, B)&#39; must *start* at or</span>
<span class="sd"> after &#39;A&#39;.</span>
<span class="sd"> **Split points**</span>
<span class="sd"> Some sources may have records that are not directly addressable. For example,</span>
<span class="sd"> imagine a file format consisting of a sequence of compressed blocks. Each</span>
<span class="sd"> block can be assigned an offset, but records within the block cannot be</span>
<span class="sd"> directly addressed without decompressing the block. Let us refer to this</span>
<span class="sd"> hypothetical format as &lt;i&gt;CBF (Compressed Blocks Format)&lt;/i&gt;.</span>
<span class="sd"> Many such formats can still satisfy the associativity property. For example,</span>
<span class="sd"> in CBF, reading &#39;[A, B)&#39; can mean &quot;read all the records in all blocks whose</span>
<span class="sd"> starting offset is in &#39;[A, B)&#39;&quot;.</span>
<span class="sd"> To support such complex formats, we introduce the notion of *split points*. We</span>
<span class="sd"> say that a record is a split point if there exists a position &#39;A&#39; such that</span>
<span class="sd"> the record is the first one to be returned when reading the range</span>
<span class="sd"> &#39;[A, infinity)&#39;. In CBF, the only split points would be the first records</span>
<span class="sd"> in each block.</span>
<span class="sd"> Split points allow us to define the meaning of a record&#39;s position and a</span>
<span class="sd"> source&#39;s range in all cases:</span>
<span class="sd"> * For a record that is at a split point, its position is defined to be the</span>
<span class="sd"> largest &#39;A&#39; such that reading a source with the range &#39;[A, infinity)&#39;</span>
<span class="sd"> returns this record.</span>
<span class="sd"> * Positions of other records are only required to be non-decreasing.</span>
<span class="sd"> * Reading the source &#39;[A, B)&#39; must return records starting from the first</span>
<span class="sd"> split point at or after &#39;A&#39;, up to but not including the first split point</span>
<span class="sd"> at or after &#39;B&#39;. In particular, this means that the first record returned</span>
<span class="sd"> by a source MUST always be a split point.</span>
<span class="sd"> * Positions of split points must be unique.</span>
<span class="sd"> As a result, for any decomposition of the full range of the source into</span>
<span class="sd"> position ranges, the total set of records will be the full set of records in</span>
<span class="sd"> the source, and each record will be read exactly once.</span>
<span class="sd"> **Consumed positions**</span>
<span class="sd"> As the source is being read, and records read from it are being passed to the</span>
<span class="sd"> downstream transforms in the pipeline, we say that positions in the source are</span>
<span class="sd"> being *consumed*. When a reader has read a record (or promised to a caller</span>
<span class="sd"> that a record will be returned), positions up to and including the record&#39;s</span>
<span class="sd"> start position are considered *consumed*.</span>
<span class="sd"> Dynamic splitting can happen only at *unconsumed* positions. If the reader</span>
<span class="sd"> just returned a record at offset 42 in a file, dynamic splitting can happen</span>
<span class="sd"> only at offset 43 or beyond, as otherwise that record could be read twice (by</span>
<span class="sd"> the current reader and by a reader of the task starting at 43).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">SPLIT_POINTS_UNKNOWN</span> <span class="o">=</span> <span class="nb">object</span><span class="p">()</span>
<div class="viewcode-block" id="RangeTracker.start_position"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.start_position">[docs]</a> <span class="k">def</span> <span class="nf">start_position</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns the starting position of the current range, inclusive.&quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div>
<div class="viewcode-block" id="RangeTracker.stop_position"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.stop_position">[docs]</a> <span class="k">def</span> <span class="nf">stop_position</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns the ending position of the current range, exclusive.&quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div>
<div class="viewcode-block" id="RangeTracker.try_claim"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.try_claim">[docs]</a> <span class="k">def</span> <span class="nf">try_claim</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">position</span><span class="p">):</span> <span class="c1"># pylint: disable=unused-argument</span>
<span class="sd">&quot;&quot;&quot;Atomically determines if a record at a split point is within the range.</span>
<span class="sd"> This method should be called **if and only if** the record is at a split</span>
<span class="sd"> point. This method may modify the internal state of the ``RangeTracker`` by</span>
<span class="sd"> updating the last-consumed position to ``position``.</span>
<span class="sd"> ** Thread safety **</span>
<span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span>
<span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span>
<span class="sd"> lock object.</span>
<span class="sd"> Args:</span>
<span class="sd"> position: starting position of a record being read by a source.</span>
<span class="sd"> Returns:</span>
<span class="sd"> ``True``, if the given position falls within the current range, returns</span>
<span class="sd"> ``False`` otherwise.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RangeTracker.set_current_position"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.set_current_position">[docs]</a> <span class="k">def</span> <span class="nf">set_current_position</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">position</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Updates the last-consumed position to the given position.</span>
<span class="sd"> A source may invoke this method for records that do not start at split</span>
<span class="sd"> points. This may modify the internal state of the ``RangeTracker``. If the</span>
<span class="sd"> record starts at a split point, method ``try_claim()`` **must** be invoked</span>
<span class="sd"> instead of this method.</span>
<span class="sd"> Args:</span>
<span class="sd"> position: starting position of a record being read by a source.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RangeTracker.position_at_fraction"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.position_at_fraction">[docs]</a> <span class="k">def</span> <span class="nf">position_at_fraction</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fraction</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns the position at the given fraction.</span>
<span class="sd"> Given a fraction within the range [0.0, 1.0) this method will return the</span>
<span class="sd"> position at the given fraction compared to the position range</span>
<span class="sd"> [self.start_position, self.stop_position).</span>
<span class="sd"> ** Thread safety **</span>
<span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span>
<span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span>
<span class="sd"> lock object.</span>
<span class="sd"> Args:</span>
<span class="sd"> fraction: a float value within the range [0.0, 1.0).</span>
<span class="sd"> Returns:</span>
<span class="sd"> a position within the range [self.start_position, self.stop_position).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RangeTracker.try_split"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.try_split">[docs]</a> <span class="k">def</span> <span class="nf">try_split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">position</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Atomically splits the current range.</span>
<span class="sd"> Determines a position to split the current range, split_position, based on</span>
<span class="sd"> the given position. In most cases split_position and position will be the</span>
<span class="sd"> same.</span>
<span class="sd"> Splits the current range &#39;[self.start_position, self.stop_position)&#39;</span>
<span class="sd"> into a &quot;primary&quot; part &#39;[self.start_position, split_position)&#39; and a</span>
<span class="sd"> &quot;residual&quot; part &#39;[split_position, self.stop_position)&#39;, assuming the</span>
<span class="sd"> current last-consumed position is within</span>
<span class="sd"> &#39;[self.start_position, split_position)&#39; (i.e., split_position has not been</span>
<span class="sd"> consumed yet).</span>
<span class="sd"> If successful, updates the current range to be the primary and returns a</span>
<span class="sd"> tuple (split_position, split_fraction). split_fraction should be the</span>
<span class="sd"> fraction of size of range &#39;[self.start_position, split_position)&#39; compared</span>
<span class="sd"> to the original (before split) range</span>
<span class="sd"> &#39;[self.start_position, self.stop_position)&#39;.</span>
<span class="sd"> If the split_position has already been consumed, returns ``None``.</span>
<span class="sd"> ** Thread safety **</span>
<span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span>
<span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span>
<span class="sd"> lock object.</span>
<span class="sd"> Args:</span>
<span class="sd"> position: suggested position where the current range should try to</span>
<span class="sd"> be split at.</span>
<span class="sd"> Returns:</span>
<span class="sd"> a tuple containing the split position and split fraction if split is</span>
<span class="sd"> successful. Returns ``None`` otherwise.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RangeTracker.fraction_consumed"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.fraction_consumed">[docs]</a> <span class="k">def</span> <span class="nf">fraction_consumed</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns the approximate fraction of consumed positions in the source.</span>
<span class="sd"> ** Thread safety **</span>
<span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span>
<span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span>
<span class="sd"> lock object.</span>
<span class="sd"> Returns:</span>
<span class="sd"> the approximate fraction of positions that have been consumed by</span>
<span class="sd"> successful &#39;try_split()&#39; and &#39;report_current_position()&#39; calls, or</span>
<span class="sd"> 0.0 if no such calls have happened.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RangeTracker.split_points"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.split_points">[docs]</a> <span class="k">def</span> <span class="nf">split_points</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Gives the number of split points consumed and remaining.</span>
<span class="sd"> For a ``RangeTracker`` used by a ``BoundedSource`` (within a</span>
<span class="sd"> ``BoundedSource.read()`` invocation) this method produces a 2-tuple that</span>
<span class="sd"> gives the number of split points consumed by the ``BoundedSource`` and the</span>
<span class="sd"> number of split points remaining within the range of the ``RangeTracker``</span>
<span class="sd"> that has not been consumed by the ``BoundedSource``.</span>
<span class="sd"> More specifically, given that the position of the current record being read</span>
<span class="sd"> by ``BoundedSource`` is current_position this method produces a tuple that</span>
<span class="sd"> consists of</span>
<span class="sd"> (1) number of split points in the range [self.start_position(),</span>
<span class="sd"> current_position) without including the split point that is currently being</span>
<span class="sd"> consumed. This represents the total amount of parallelism in the consumed</span>
<span class="sd"> part of the source.</span>
<span class="sd"> (2) number of split points within the range</span>
<span class="sd"> [current_position, self.stop_position()) including the split point that is</span>
<span class="sd"> currently being consumed. This represents the total amount of parallelism in</span>
<span class="sd"> the unconsumed part of the source.</span>
<span class="sd"> Methods of the class ``RangeTracker`` including this method may get invoked</span>
<span class="sd"> by different threads, hence must be made thread-safe, e.g. by using a single</span>
<span class="sd"> lock object.</span>
<span class="sd"> ** General information about consumed and remaining number of split</span>
<span class="sd"> points returned by this method. **</span>
<span class="sd"> * Before a source read (``BoundedSource.read()`` invocation) claims the</span>
<span class="sd"> first split point, number of consumed split points is 0. This condition</span>
<span class="sd"> holds independent of whether the input is &quot;splittable&quot;. A splittable</span>
<span class="sd"> source is a source that has more than one split point.</span>
<span class="sd"> * Any source read that has only claimed one split point has 0 consumed</span>
<span class="sd"> split points since the first split point is the current split point and</span>
<span class="sd"> is still being processed. This condition holds independent of whether</span>
<span class="sd"> the input is splittable.</span>
<span class="sd"> * For an empty source read which never invokes</span>
<span class="sd"> ``RangeTracker.try_claim()``, the consumed number of split points is 0.</span>
<span class="sd"> This condition holds independent of whether the input is splittable.</span>
<span class="sd"> * For a source read which has invoked ``RangeTracker.try_claim()`` n</span>
<span class="sd"> times, the consumed number of split points is n -1.</span>
<span class="sd"> * If a ``BoundedSource`` sets a callback through function</span>
<span class="sd"> ``set_split_points_unclaimed_callback()``, ``RangeTracker`` can use that</span>
<span class="sd"> callback when determining remaining number of split points.</span>
<span class="sd"> * Remaining split points should include the split point that is currently</span>
<span class="sd"> being consumed by the source read. Hence if the above callback returns</span>
<span class="sd"> an integer value n, remaining number of split points should be (n + 1).</span>
<span class="sd"> * After last split point is claimed remaining split points becomes 1,</span>
<span class="sd"> because this unfinished read itself represents an unfinished split</span>
<span class="sd"> point.</span>
<span class="sd"> * After all records of the source has been consumed, remaining number of</span>
<span class="sd"> split points becomes 0 and consumed number of split points becomes equal</span>
<span class="sd"> to the total number of split points within the range being read by the</span>
<span class="sd"> source. This method does not address this condition and will continue to</span>
<span class="sd"> report number of consumed split points as</span>
<span class="sd"> (&quot;total number of split points&quot; - 1) and number of remaining split</span>
<span class="sd"> points as 1. A runner that performs the reading of the source can</span>
<span class="sd"> detect when all records have been consumed and adjust remaining and</span>
<span class="sd"> consumed number of split points accordingly.</span>
<span class="sd"> ** Examples **</span>
<span class="sd"> (1) A &quot;perfectly splittable&quot; input which can be read in parallel down to the</span>
<span class="sd"> individual records.</span>
<span class="sd"> Consider a perfectly splittable input that consists of 50 split points.</span>
<span class="sd"> * Before a source read (``BoundedSource.read()`` invocation) claims the</span>
<span class="sd"> first split point, number of consumed split points is 0 number of</span>
<span class="sd"> remaining split points is 50.</span>
<span class="sd"> * After claiming first split point, consumed number of split points is 0</span>
<span class="sd"> and remaining number of split is 50.</span>
<span class="sd"> * After claiming split point #30, consumed number of split points is 29</span>
<span class="sd"> and remaining number of split points is 21.</span>
<span class="sd"> * After claiming all 50 split points, consumed number of split points is</span>
<span class="sd"> 49 and remaining number of split points is 1.</span>
<span class="sd"> (2) a &quot;block-compressed&quot; file format such as ``avroio``, in which a block of</span>
<span class="sd"> records has to be read as a whole, but different blocks can be read in</span>
<span class="sd"> parallel.</span>
<span class="sd"> Consider a block compressed input that consists of 5 blocks.</span>
<span class="sd"> * Before a source read (``BoundedSource.read()`` invocation) claims the</span>
<span class="sd"> first split point (first block), number of consumed split points is 0</span>
<span class="sd"> number of remaining split points is 5.</span>
<span class="sd"> * After claiming first split point, consumed number of split points is 0</span>
<span class="sd"> and remaining number of split is 5.</span>
<span class="sd"> * After claiming split point #3, consumed number of split points is 2</span>
<span class="sd"> and remaining number of split points is 3.</span>
<span class="sd"> * After claiming all 5 split points, consumed number of split points is</span>
<span class="sd"> 4 and remaining number of split points is 1.</span>
<span class="sd"> (3) an &quot;unsplittable&quot; input such as a cursor in a database or a gzip</span>
<span class="sd"> compressed file.</span>
<span class="sd"> Such an input is considered to have only a single split point. Number of</span>
<span class="sd"> consumed split points is always 0 and number of remaining split points</span>
<span class="sd"> is always 1.</span>
<span class="sd"> By default ``RangeTracker` returns ``RangeTracker.SPLIT_POINTS_UNKNOWN`` for</span>
<span class="sd"> both consumed and remaining number of split points, which indicates that the</span>
<span class="sd"> number of split points consumed and remaining is unknown.</span>
<span class="sd"> Returns:</span>
<span class="sd"> A pair that gives consumed and remaining number of split points. Consumed</span>
<span class="sd"> number of split points should be an integer larger than or equal to zero</span>
<span class="sd"> or ``RangeTracker.SPLIT_POINTS_UNKNOWN``. Remaining number of split points</span>
<span class="sd"> should be an integer larger than zero or</span>
<span class="sd"> ``RangeTracker.SPLIT_POINTS_UNKNOWN``.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="p">(</span><span class="n">RangeTracker</span><span class="o">.</span><span class="n">SPLIT_POINTS_UNKNOWN</span><span class="p">,</span>
<span class="n">RangeTracker</span><span class="o">.</span><span class="n">SPLIT_POINTS_UNKNOWN</span><span class="p">)</span></div>
<div class="viewcode-block" id="RangeTracker.set_split_points_unclaimed_callback"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker.set_split_points_unclaimed_callback">[docs]</a> <span class="k">def</span> <span class="nf">set_split_points_unclaimed_callback</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">callback</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Sets a callback for determining the unclaimed number of split points.</span>
<span class="sd"> By invoking this function, a ``BoundedSource`` can set a callback function</span>
<span class="sd"> that may get invoked by the ``RangeTracker`` to determine the number of</span>
<span class="sd"> unclaimed split points. A split point is unclaimed if</span>
<span class="sd"> ``RangeTracker.try_claim()`` method has not been successfully invoked for</span>
<span class="sd"> that particular split point. The callback function accepts a single</span>
<span class="sd"> parameter, a stop position for the BoundedSource (stop_position). If the</span>
<span class="sd"> record currently being consumed by the ``BoundedSource`` is at position</span>
<span class="sd"> current_position, callback should return the number of split points within</span>
<span class="sd"> the range (current_position, stop_position). Note that, this should not</span>
<span class="sd"> include the split point that is currently being consumed by the source.</span>
<span class="sd"> This function must be implemented by subclasses before being used.</span>
<span class="sd"> Args:</span>
<span class="sd"> callback: a function that takes a single parameter, a stop position,</span>
<span class="sd"> and returns unclaimed number of split points for the source read</span>
<span class="sd"> operation that is calling this function. Value returned from</span>
<span class="sd"> callback should be either an integer larger than or equal to</span>
<span class="sd"> zero or ``RangeTracker.SPLIT_POINTS_UNKNOWN``.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div>
<div class="viewcode-block" id="Sink"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink">[docs]</a><span class="k">class</span> <span class="nc">Sink</span><span class="p">(</span><span class="n">HasDisplayData</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;This class is deprecated, no backwards-compatibility guarantees.</span>
<span class="sd"> A resource that can be written to using the ``beam.io.Write`` transform.</span>
<span class="sd"> Here ``beam`` stands for Apache Beam Python code imported in following manner.</span>
<span class="sd"> ``import apache_beam as beam``.</span>
<span class="sd"> A parallel write to an ``iobase.Sink`` consists of three phases:</span>
<span class="sd"> 1. A sequential *initialization* phase (e.g., creating a temporary output</span>
<span class="sd"> directory, etc.)</span>
<span class="sd"> 2. A parallel write phase where workers write *bundles* of records</span>
<span class="sd"> 3. A sequential *finalization* phase (e.g., committing the writes, merging</span>
<span class="sd"> output files, etc.)</span>
<span class="sd"> Implementing a new sink requires extending two classes.</span>
<span class="sd"> 1. iobase.Sink</span>
<span class="sd"> ``iobase.Sink`` is an immutable logical description of the location/resource</span>
<span class="sd"> to write to. Depending on the type of sink, it may contain fields such as the</span>
<span class="sd"> path to an output directory on a filesystem, a database table name,</span>
<span class="sd"> etc. ``iobase.Sink`` provides methods for performing a write operation to the</span>
<span class="sd"> sink described by it. To this end, implementors of an extension of</span>
<span class="sd"> ``iobase.Sink`` must implement three methods:</span>
<span class="sd"> ``initialize_write()``, ``open_writer()``, and ``finalize_write()``.</span>
<span class="sd"> 2. iobase.Writer</span>
<span class="sd"> ``iobase.Writer`` is used to write a single bundle of records. An</span>
<span class="sd"> ``iobase.Writer`` defines two methods: ``write()`` which writes a</span>
<span class="sd"> single record from the bundle and ``close()`` which is called once</span>
<span class="sd"> at the end of writing a bundle.</span>
<span class="sd"> See also ``apache_beam.io.filebasedsink.FileBasedSink`` which provides a</span>
<span class="sd"> simpler API for writing sinks that produce files.</span>
<span class="sd"> **Execution of the Write transform**</span>
<span class="sd"> ``initialize_write()``, ``pre_finalize()``, and ``finalize_write()`` are</span>
<span class="sd"> conceptually called once. However, implementors must</span>
<span class="sd"> ensure that these methods are *idempotent*, as they may be called multiple</span>
<span class="sd"> times on different machines in the case of failure/retry. A method may be</span>
<span class="sd"> called more than once concurrently, in which case it&#39;s okay to have a</span>
<span class="sd"> transient failure (such as due to a race condition). This failure should not</span>
<span class="sd"> prevent subsequent retries from succeeding.</span>
<span class="sd"> ``initialize_write()`` should perform any initialization that needs to be done</span>
<span class="sd"> prior to writing to the sink. ``initialize_write()`` may return a result</span>
<span class="sd"> (let&#39;s call this ``init_result``) that contains any parameters it wants to</span>
<span class="sd"> pass on to its writers about the sink. For example, a sink that writes to a</span>
<span class="sd"> file system may return an ``init_result`` that contains a dynamically</span>
<span class="sd"> generated unique directory to which data should be written.</span>
<span class="sd"> To perform writing of a bundle of elements, Dataflow execution engine will</span>
<span class="sd"> create an ``iobase.Writer`` using the implementation of</span>
<span class="sd"> ``iobase.Sink.open_writer()``. When invoking ``open_writer()`` execution</span>
<span class="sd"> engine will provide the ``init_result`` returned by ``initialize_write()``</span>
<span class="sd"> invocation as well as a *bundle id* (let&#39;s call this ``bundle_id``) that is</span>
<span class="sd"> unique for each invocation of ``open_writer()``.</span>
<span class="sd"> Execution engine will then invoke ``iobase.Writer.write()`` implementation for</span>
<span class="sd"> each element that has to be written. Once all elements of a bundle are</span>
<span class="sd"> written, execution engine will invoke ``iobase.Writer.close()`` implementation</span>
<span class="sd"> which should return a result (let&#39;s call this ``write_result``) that contains</span>
<span class="sd"> information that encodes the result of the write and, in most cases, some</span>
<span class="sd"> encoding of the unique bundle id. For example, if each bundle is written to a</span>
<span class="sd"> unique temporary file, ``close()`` method may return an object that contains</span>
<span class="sd"> the temporary file name. After writing of all bundles is complete, execution</span>
<span class="sd"> engine will invoke ``pre_finalize()`` and then ``finalize_write()``</span>
<span class="sd"> implementation.</span>
<span class="sd"> The execution of a write transform can be illustrated using following pseudo</span>
<span class="sd"> code (assume that the outer for loop happens in parallel across many</span>
<span class="sd"> machines)::</span>
<span class="sd"> init_result = sink.initialize_write()</span>
<span class="sd"> write_results = []</span>
<span class="sd"> for bundle in partition(pcoll):</span>
<span class="sd"> writer = sink.open_writer(init_result, generate_bundle_id())</span>
<span class="sd"> for elem in bundle:</span>
<span class="sd"> writer.write(elem)</span>
<span class="sd"> write_results.append(writer.close())</span>
<span class="sd"> pre_finalize_result = sink.pre_finalize(init_result, write_results)</span>
<span class="sd"> sink.finalize_write(init_result, write_results, pre_finalize_result)</span>
<span class="sd"> **init_result**</span>
<span class="sd"> Methods of &#39;iobase.Sink&#39; should agree on the &#39;init_result&#39; type that will be</span>
<span class="sd"> returned when initializing the sink. This type can be a client-defined object</span>
<span class="sd"> or an existing type. The returned type must be picklable using Dataflow coder</span>
<span class="sd"> ``coders.PickleCoder``. Returning an init_result is optional.</span>
<span class="sd"> **bundle_id**</span>
<span class="sd"> In order to ensure fault-tolerance, a bundle may be executed multiple times</span>
<span class="sd"> (e.g., in the event of failure/retry or for redundancy). However, exactly one</span>
<span class="sd"> of these executions will have its result passed to the</span>
<span class="sd"> ``iobase.Sink.finalize_write()`` method. Each call to</span>
<span class="sd"> ``iobase.Sink.open_writer()`` is passed a unique bundle id when it is called</span>
<span class="sd"> by the ``WriteImpl`` transform, so even redundant or retried bundles will have</span>
<span class="sd"> a unique way of identifying their output.</span>
<span class="sd"> The bundle id should be used to guarantee that a bundle&#39;s output is unique.</span>
<span class="sd"> This uniqueness guarantee is important; if a bundle is to be output to a file,</span>
<span class="sd"> for example, the name of the file must be unique to avoid conflicts with other</span>
<span class="sd"> writers. The bundle id should be encoded in the writer result returned by the</span>
<span class="sd"> writer and subsequently used by the ``finalize_write()`` method to identify</span>
<span class="sd"> the results of successful writes.</span>
<span class="sd"> For example, consider the scenario where a Writer writes files containing</span>
<span class="sd"> serialized records and the ``finalize_write()`` is to merge or rename these</span>
<span class="sd"> output files. In this case, a writer may use its unique id to name its output</span>
<span class="sd"> file (to avoid conflicts) and return the name of the file it wrote as its</span>
<span class="sd"> writer result. The ``finalize_write()`` will then receive an ``Iterable`` of</span>
<span class="sd"> output file names that it can then merge or rename using some bundle naming</span>
<span class="sd"> scheme.</span>
<span class="sd"> **write_result**</span>
<span class="sd"> ``iobase.Writer.close()`` and ``finalize_write()`` implementations must agree</span>
<span class="sd"> on type of the ``write_result`` object returned when invoking</span>
<span class="sd"> ``iobase.Writer.close()``. This type can be a client-defined object or</span>
<span class="sd"> an existing type. The returned type must be picklable using Dataflow coder</span>
<span class="sd"> ``coders.PickleCoder``. Returning a ``write_result`` when</span>
<span class="sd"> ``iobase.Writer.close()`` is invoked is optional but if unique</span>
<span class="sd"> ``write_result`` objects are not returned, sink should, guarantee idempotency</span>
<span class="sd"> when same bundle is written multiple times due to failure/retry or redundancy.</span>
<span class="sd"> **More information**</span>
<span class="sd"> For more information on creating new sinks please refer to the official</span>
<span class="sd"> documentation at</span>
<span class="sd"> ``https://beam.apache.org/documentation/sdks/python-custom-io#creating-sinks``</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="Sink.initialize_write"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink.initialize_write">[docs]</a> <span class="k">def</span> <span class="nf">initialize_write</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initializes the sink before writing begins.</span>
<span class="sd"> Invoked before any data is written to the sink.</span>
<span class="sd"> Please see documentation in ``iobase.Sink`` for an example.</span>
<span class="sd"> Returns:</span>
<span class="sd"> An object that contains any sink specific state generated by</span>
<span class="sd"> initialization. This object will be passed to open_writer() and</span>
<span class="sd"> finalize_write() methods.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="Sink.open_writer"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink.open_writer">[docs]</a> <span class="k">def</span> <span class="nf">open_writer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">init_result</span><span class="p">,</span> <span class="n">uid</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Opens a writer for writing a bundle of elements to the sink.</span>
<span class="sd"> Args:</span>
<span class="sd"> init_result: the result of initialize_write() invocation.</span>
<span class="sd"> uid: a unique identifier generated by the system.</span>
<span class="sd"> Returns:</span>
<span class="sd"> an ``iobase.Writer`` that can be used to write a bundle of records to the</span>
<span class="sd"> current sink.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="Sink.pre_finalize"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink.pre_finalize">[docs]</a> <span class="k">def</span> <span class="nf">pre_finalize</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">init_result</span><span class="p">,</span> <span class="n">writer_results</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Pre-finalization stage for sink.</span>
<span class="sd"> Called after all bundle writes are complete and before finalize_write.</span>
<span class="sd"> Used to setup and verify filesystem and sink states.</span>
<span class="sd"> Args:</span>
<span class="sd"> init_result: the result of ``initialize_write()`` invocation.</span>
<span class="sd"> writer_results: an iterable containing results of ``Writer.close()``</span>
<span class="sd"> invocations. This will only contain results of successful writes, and</span>
<span class="sd"> will only contain the result of a single successful write for a given</span>
<span class="sd"> bundle.</span>
<span class="sd"> Returns:</span>
<span class="sd"> An object that contains any sink specific state generated.</span>
<span class="sd"> This object will be passed to finalize_write().</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="Sink.finalize_write"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Sink.finalize_write">[docs]</a> <span class="k">def</span> <span class="nf">finalize_write</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">init_result</span><span class="p">,</span> <span class="n">writer_results</span><span class="p">,</span>
<span class="n">pre_finalize_result</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Finalizes the sink after all data is written to it.</span>
<span class="sd"> Given the result of initialization and an iterable of results from bundle</span>
<span class="sd"> writes, performs finalization after writing and closes the sink. Called</span>
<span class="sd"> after all bundle writes are complete.</span>
<span class="sd"> The bundle write results that are passed to finalize are those returned by</span>
<span class="sd"> bundles that completed successfully. Although bundles may have been run</span>
<span class="sd"> multiple times (for fault-tolerance), only one writer result will be passed</span>
<span class="sd"> to finalize for each bundle. An implementation of finalize should perform</span>
<span class="sd"> clean up of any failed and successfully retried bundles. Note that these</span>
<span class="sd"> failed bundles will not have their writer result passed to finalize, so</span>
<span class="sd"> finalize should be capable of locating any temporary/partial output written</span>
<span class="sd"> by failed bundles.</span>
<span class="sd"> If all retries of a bundle fails, the whole pipeline will fail *without*</span>
<span class="sd"> finalize_write() being invoked.</span>
<span class="sd"> A best practice is to make finalize atomic. If this is impossible given the</span>
<span class="sd"> semantics of the sink, finalize should be idempotent, as it may be called</span>
<span class="sd"> multiple times in the case of failure/retry or for redundancy.</span>
<span class="sd"> Note that the iteration order of the writer results is not guaranteed to be</span>
<span class="sd"> consistent if finalize is called multiple times.</span>
<span class="sd"> Args:</span>
<span class="sd"> init_result: the result of ``initialize_write()`` invocation.</span>
<span class="sd"> writer_results: an iterable containing results of ``Writer.close()``</span>
<span class="sd"> invocations. This will only contain results of successful writes, and</span>
<span class="sd"> will only contain the result of a single successful write for a given</span>
<span class="sd"> bundle.</span>
<span class="sd"> pre_finalize_result: the result of ``pre_finalize()`` invocation.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div>
<div class="viewcode-block" id="Writer"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Writer">[docs]</a><span class="k">class</span> <span class="nc">Writer</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;This class is deprecated, no backwards-compatibility guarantees.</span>
<span class="sd"> Writes a bundle of elements from a ``PCollection`` to a sink.</span>
<span class="sd"> A Writer ``iobase.Writer.write()`` writes and elements to the sink while</span>
<span class="sd"> ``iobase.Writer.close()`` is called after all elements in the bundle have been</span>
<span class="sd"> written.</span>
<span class="sd"> See ``iobase.Sink`` for more detailed documentation about the process of</span>
<span class="sd"> writing to a sink.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="Writer.write"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Writer.write">[docs]</a> <span class="k">def</span> <span class="nf">write</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Writes a value to the sink using the current writer.&quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="Writer.close"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Writer.close">[docs]</a> <span class="k">def</span> <span class="nf">close</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Closes the current writer.</span>
<span class="sd"> Please see documentation in ``iobase.Sink`` for an example.</span>
<span class="sd"> Returns:</span>
<span class="sd"> An object representing the writes that were performed by the current</span>
<span class="sd"> writer.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div>
<div class="viewcode-block" id="Read"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read">[docs]</a><span class="k">class</span> <span class="nc">Read</span><span class="p">(</span><span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A transform that reads a PCollection.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initializes a Read transform.</span>
<span class="sd"> Args:</span>
<span class="sd"> source: Data source to read from.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nb">super</span><span class="p">(</span><span class="n">Read</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">source</span> <span class="o">=</span> <span class="n">source</span>
<div class="viewcode-block" id="Read.expand"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pbegin</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="k">import</span> <span class="n">DebugOptions</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="k">import</span> <span class="n">util</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pbegin</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">=</span> <span class="n">pbegin</span><span class="o">.</span><span class="n">pipeline</span>
<span class="n">debug_options</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">_options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">DebugOptions</span><span class="p">)</span>
<span class="k">if</span> <span class="n">debug_options</span><span class="o">.</span><span class="n">experiments</span> <span class="ow">and</span> <span class="s1">&#39;beam_fn_api&#39;</span> <span class="ow">in</span> <span class="n">debug_options</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span>
<span class="n">source</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span>
<span class="k">def</span> <span class="nf">split_source</span><span class="p">(</span><span class="n">unused_impulse</span><span class="p">):</span>
<span class="n">total_size</span> <span class="o">=</span> <span class="n">source</span><span class="o">.</span><span class="n">estimate_size</span><span class="p">()</span>
<span class="k">if</span> <span class="n">total_size</span><span class="p">:</span>
<span class="c1"># 1MB = 1 shard, 1GB = 32 shards, 1TB = 1000 shards, 1PB = 32k shards</span>
<span class="n">chunk_size</span> <span class="o">=</span> <span class="nb">max</span><span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">20</span><span class="p">,</span> <span class="mi">1000</span> <span class="o">*</span> <span class="nb">int</span><span class="p">(</span><span class="n">math</span><span class="o">.</span><span class="n">sqrt</span><span class="p">(</span><span class="n">total_size</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">chunk_size</span> <span class="o">=</span> <span class="mi">64</span> <span class="o">&lt;&lt;</span> <span class="mi">20</span> <span class="c1"># 64mb</span>
<span class="k">return</span> <span class="n">source</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="n">chunk_size</span><span class="p">)</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">pbegin</span>
<span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">Impulse</span><span class="p">()</span>
<span class="o">|</span> <span class="s1">&#39;Split&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span><span class="n">split_source</span><span class="p">)</span>
<span class="o">|</span> <span class="n">util</span><span class="o">.</span><span class="n">Reshuffle</span><span class="p">()</span>
<span class="o">|</span> <span class="s1">&#39;ReadSplits&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span><span class="k">lambda</span> <span class="n">split</span><span class="p">:</span> <span class="n">split</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">read</span><span class="p">(</span>
<span class="n">split</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">get_range_tracker</span><span class="p">(</span>
<span class="n">split</span><span class="o">.</span><span class="n">start_position</span><span class="p">,</span> <span class="n">split</span><span class="o">.</span><span class="n">stop_position</span><span class="p">))))</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># Treat Read itself as a primitive.</span>
<span class="k">return</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span></div>
<div class="viewcode-block" id="Read.get_windowing"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.get_windowing">[docs]</a> <span class="k">def</span> <span class="nf">get_windowing</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_inputs</span><span class="p">):</span>
<span class="k">return</span> <span class="n">core</span><span class="o">.</span><span class="n">Windowing</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span></div>
<span class="k">def</span> <span class="nf">_infer_output_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">input_coder</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">BoundedSource</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">default_output_coder</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">coder</span>
<div class="viewcode-block" id="Read.display_data"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="s1">&#39;source&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span>
<span class="n">label</span><span class="o">=</span><span class="s1">&#39;Read Source&#39;</span><span class="p">),</span>
<span class="s1">&#39;source_dd&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="p">}</span></div>
<div class="viewcode-block" id="Read.to_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.to_runner_api_parameter">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span><span class="n">common_urns</span><span class="o">.</span><span class="n">deprecated_primitives</span><span class="o">.</span><span class="n">READ</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ReadPayload</span><span class="p">(</span>
<span class="n">source</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">(</span><span class="n">context</span><span class="p">),</span>
<span class="n">is_bounded</span><span class="o">=</span><span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">IsBounded</span><span class="o">.</span><span class="n">BOUNDED</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">is_bounded</span><span class="p">()</span>
<span class="k">else</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">IsBounded</span><span class="o">.</span><span class="n">UNBOUNDED</span><span class="p">))</span></div>
<div class="viewcode-block" id="Read.from_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Read.from_runner_api_parameter">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">from_runner_api_parameter</span><span class="p">(</span><span class="n">parameter</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">return</span> <span class="n">Read</span><span class="p">(</span><span class="n">SourceBase</span><span class="o">.</span><span class="n">from_runner_api</span><span class="p">(</span><span class="n">parameter</span><span class="o">.</span><span class="n">source</span><span class="p">,</span> <span class="n">context</span><span class="p">))</span></div></div>
<span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">deprecated_primitives</span><span class="o">.</span><span class="n">READ</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ReadPayload</span><span class="p">,</span>
<span class="n">Read</span><span class="o">.</span><span class="n">from_runner_api_parameter</span><span class="p">)</span>
<div class="viewcode-block" id="Write"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Write">[docs]</a><span class="k">class</span> <span class="nc">Write</span><span class="p">(</span><span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A ``PTransform`` that writes to a sink.</span>
<span class="sd"> A sink should inherit ``iobase.Sink``. Such implementations are</span>
<span class="sd"> handled using a composite transform that consists of three ``ParDo``s -</span>
<span class="sd"> (1) a ``ParDo`` performing a global initialization (2) a ``ParDo`` performing</span>
<span class="sd"> a parallel write and (3) a ``ParDo`` performing a global finalization. In the</span>
<span class="sd"> case of an empty ``PCollection``, only the global initialization and</span>
<span class="sd"> finalization will be performed. Currently only batch workflows support custom</span>
<span class="sd"> sinks.</span>
<span class="sd"> Example usage::</span>
<span class="sd"> pcollection | beam.io.Write(MySink())</span>
<span class="sd"> This returns a ``pvalue.PValue`` object that represents the end of the</span>
<span class="sd"> Pipeline.</span>
<span class="sd"> The sink argument may also be a full PTransform, in which case it will be</span>
<span class="sd"> applied directly. This allows composite sink-like transforms (e.g. a sink</span>
<span class="sd"> with some pre-processing DoFns) to be used the same as all other sinks.</span>
<span class="sd"> This transform also supports sinks that inherit ``iobase.NativeSink``. These</span>
<span class="sd"> are sinks that are implemented natively by the Dataflow service and hence</span>
<span class="sd"> should not be updated by users. These sinks are processed using a Dataflow</span>
<span class="sd"> native write transform.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sink</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initializes a Write transform.</span>
<span class="sd"> Args:</span>
<span class="sd"> sink: Data sink to write to.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nb">super</span><span class="p">(</span><span class="n">Write</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink</span> <span class="o">=</span> <span class="n">sink</span>
<div class="viewcode-block" id="Write.display_data"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Write.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="s1">&#39;sink&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span>
<span class="s1">&#39;sink_dd&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">}</span></div>
<div class="viewcode-block" id="Write.expand"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.Write.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.dataflow.native_io</span> <span class="k">import</span> <span class="n">iobase</span> <span class="k">as</span> <span class="n">dataflow_io</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="n">dataflow_io</span><span class="o">.</span><span class="n">NativeSink</span><span class="p">):</span>
<span class="c1"># A native sink</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="s1">&#39;NativeWrite&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">dataflow_io</span><span class="o">.</span><span class="n">_NativeWrite</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="n">Sink</span><span class="p">):</span>
<span class="c1"># A custom sink</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">WriteImpl</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="c1"># This allows &quot;composite&quot; sinks to be used like non-composite ones.</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;A sink must inherit iobase.Sink, iobase.NativeSink, &#39;</span>
<span class="s1">&#39;or be a PTransform. Received : </span><span class="si">%r</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">)</span></div></div>
<span class="k">class</span> <span class="nc">WriteImpl</span><span class="p">(</span><span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Implements the writing of custom sinks.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sink</span><span class="p">):</span>
<span class="nb">super</span><span class="p">(</span><span class="n">WriteImpl</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink</span> <span class="o">=</span> <span class="n">sink</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="n">do_once</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">|</span> <span class="s1">&#39;DoOnce&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span>
<span class="n">init_result_coll</span> <span class="o">=</span> <span class="n">do_once</span> <span class="o">|</span> <span class="s1">&#39;InitializeWrite&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">_</span><span class="p">,</span> <span class="n">sink</span><span class="p">:</span> <span class="n">sink</span><span class="o">.</span><span class="n">initialize_write</span><span class="p">(),</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span> <span class="s1">&#39;num_shards&#39;</span><span class="p">,</span> <span class="mi">0</span><span class="p">):</span>
<span class="n">min_shards</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">num_shards</span>
<span class="k">if</span> <span class="n">min_shards</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="n">keyed_pcoll</span> <span class="o">=</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">x</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">keyed_pcoll</span> <span class="o">=</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">_RoundRobinKeyFn</span><span class="p">(</span><span class="n">min_shards</span><span class="p">))</span>
<span class="n">write_result_coll</span> <span class="o">=</span> <span class="p">(</span><span class="n">keyed_pcoll</span>
<span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span>
<span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span>
<span class="o">|</span> <span class="s1">&#39;WriteBundles&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_WriteKeyedBundleDoFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">),</span>
<span class="n">AsSingleton</span><span class="p">(</span><span class="n">init_result_coll</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">min_shards</span> <span class="o">=</span> <span class="mi">1</span>
<span class="n">write_result_coll</span> <span class="o">=</span> <span class="p">(</span><span class="n">pcoll</span>
<span class="o">|</span> <span class="s1">&#39;WriteBundles&#39;</span> <span class="o">&gt;&gt;</span>
<span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">_WriteBundleDoFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">),</span>
<span class="n">AsSingleton</span><span class="p">(</span><span class="n">init_result_coll</span><span class="p">))</span>
<span class="o">|</span> <span class="s1">&#39;Pair&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">x</span><span class="p">))</span>
<span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">())</span>
<span class="o">|</span> <span class="n">core</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span>
<span class="o">|</span> <span class="s1">&#39;Extract&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">]))</span>
<span class="c1"># PreFinalize should run before FinalizeWrite, and the two should not be</span>
<span class="c1"># fused.</span>
<span class="n">pre_finalize_coll</span> <span class="o">=</span> <span class="n">do_once</span> <span class="o">|</span> <span class="s1">&#39;PreFinalize&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span>
<span class="n">_pre_finalize</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span>
<span class="n">AsSingleton</span><span class="p">(</span><span class="n">init_result_coll</span><span class="p">),</span>
<span class="n">AsIter</span><span class="p">(</span><span class="n">write_result_coll</span><span class="p">))</span>
<span class="k">return</span> <span class="n">do_once</span> <span class="o">|</span> <span class="s1">&#39;FinalizeWrite&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">core</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span>
<span class="n">_finalize_write</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">,</span>
<span class="n">AsSingleton</span><span class="p">(</span><span class="n">init_result_coll</span><span class="p">),</span>
<span class="n">AsIter</span><span class="p">(</span><span class="n">write_result_coll</span><span class="p">),</span>
<span class="n">min_shards</span><span class="p">,</span>
<span class="n">AsSingleton</span><span class="p">(</span><span class="n">pre_finalize_coll</span><span class="p">))</span>
<span class="k">class</span> <span class="nc">_WriteBundleDoFn</span><span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A DoFn for writing elements to an iobase.Writer.</span>
<span class="sd"> Opens a writer at the first element and closes the writer at finish_bundle().</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sink</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink</span> <span class="o">=</span> <span class="n">sink</span>
<span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="s1">&#39;sink_dd&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">}</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">init_result</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># We ignore UUID collisions here since they are extremely rare.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">open_writer</span><span class="p">(</span><span class="n">init_result</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">()))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">WindowedValue</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">(),</span>
<span class="n">window</span><span class="o">.</span><span class="n">GlobalWindow</span><span class="p">()</span><span class="o">.</span><span class="n">max_timestamp</span><span class="p">(),</span>
<span class="p">[</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindow</span><span class="p">()])</span>
<span class="k">class</span> <span class="nc">_WriteKeyedBundleDoFn</span><span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sink</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink</span> <span class="o">=</span> <span class="n">sink</span>
<span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="s1">&#39;sink_dd&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="p">}</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">init_result</span><span class="p">):</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="n">element</span>
<span class="n">writer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink</span><span class="o">.</span><span class="n">open_writer</span><span class="p">(</span><span class="n">init_result</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">()))</span>
<span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">bundle</span><span class="p">[</span><span class="mi">1</span><span class="p">]:</span> <span class="c1"># values</span>
<span class="n">writer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
<span class="k">return</span> <span class="p">[</span><span class="n">window</span><span class="o">.</span><span class="n">TimestampedValue</span><span class="p">(</span><span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">(),</span> <span class="n">window</span><span class="o">.</span><span class="n">MAX_TIMESTAMP</span><span class="p">)]</span>
<span class="k">def</span> <span class="nf">_pre_finalize</span><span class="p">(</span><span class="n">unused_element</span><span class="p">,</span> <span class="n">sink</span><span class="p">,</span> <span class="n">init_result</span><span class="p">,</span> <span class="n">write_results</span><span class="p">):</span>
<span class="k">return</span> <span class="n">sink</span><span class="o">.</span><span class="n">pre_finalize</span><span class="p">(</span><span class="n">init_result</span><span class="p">,</span> <span class="n">write_results</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_finalize_write</span><span class="p">(</span><span class="n">unused_element</span><span class="p">,</span> <span class="n">sink</span><span class="p">,</span> <span class="n">init_result</span><span class="p">,</span> <span class="n">write_results</span><span class="p">,</span>
<span class="n">min_shards</span><span class="p">,</span> <span class="n">pre_finalize_results</span><span class="p">):</span>
<span class="n">write_results</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">write_results</span><span class="p">)</span>
<span class="n">extra_shards</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">write_results</span><span class="p">)</span> <span class="o">&lt;</span> <span class="n">min_shards</span><span class="p">:</span>
<span class="n">logging</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span>
<span class="s1">&#39;Creating </span><span class="si">%s</span><span class="s1"> empty shard(s).&#39;</span><span class="p">,</span> <span class="n">min_shards</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="n">write_results</span><span class="p">))</span>
<span class="k">for</span> <span class="n">_</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">min_shards</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="n">write_results</span><span class="p">)):</span>
<span class="n">writer</span> <span class="o">=</span> <span class="n">sink</span><span class="o">.</span><span class="n">open_writer</span><span class="p">(</span><span class="n">init_result</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">()))</span>
<span class="n">extra_shards</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">())</span>
<span class="n">outputs</span> <span class="o">=</span> <span class="n">sink</span><span class="o">.</span><span class="n">finalize_write</span><span class="p">(</span><span class="n">init_result</span><span class="p">,</span> <span class="n">write_results</span> <span class="o">+</span> <span class="n">extra_shards</span><span class="p">,</span>
<span class="n">pre_finalize_results</span><span class="p">)</span>
<span class="k">if</span> <span class="n">outputs</span><span class="p">:</span>
<span class="k">return</span> <span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">TimestampedValue</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="n">window</span><span class="o">.</span><span class="n">MAX_TIMESTAMP</span><span class="p">)</span> <span class="k">for</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">outputs</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_RoundRobinKeyFn</span><span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">count</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">count</span> <span class="o">=</span> <span class="n">count</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">counter</span> <span class="o">=</span> <span class="n">random</span><span class="o">.</span><span class="n">randint</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">count</span> <span class="o">-</span> <span class="mi">1</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">counter</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">counter</span> <span class="o">&gt;=</span> <span class="bp">self</span><span class="o">.</span><span class="n">count</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">counter</span> <span class="o">-=</span> <span class="bp">self</span><span class="o">.</span><span class="n">count</span>
<span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">counter</span><span class="p">,</span> <span class="n">element</span>
<div class="viewcode-block" id="RestrictionTracker"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker">[docs]</a><span class="k">class</span> <span class="nc">RestrictionTracker</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Manages concurrent access to a restriction.</span>
<span class="sd"> Experimental; no backwards-compatibility guarantees.</span>
<span class="sd"> Keeps track of the restrictions claimed part for a Splittable DoFn.</span>
<span class="sd"> See following documents for more details.</span>
<span class="sd"> * https://s.apache.org/splittable-do-fn</span>
<span class="sd"> * https://s.apache.org/splittable-do-fn-python-sdk</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="RestrictionTracker.current_restriction"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.current_restriction">[docs]</a> <span class="k">def</span> <span class="nf">current_restriction</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns the current restriction.</span>
<span class="sd"> Returns a restriction accurately describing the full range of work the</span>
<span class="sd"> current ``DoFn.process()`` call will do, including already completed work.</span>
<span class="sd"> The current restriction returned by method may be updated dynamically due</span>
<span class="sd"> to due to concurrent invocation of other methods of the</span>
<span class="sd"> ``RestrictionTracker``, For example, ``checkpoint()``.</span>
<span class="sd"> ** Thread safety **</span>
<span class="sd"> Methods of the class ``RestrictionTracker`` including this method may get</span>
<span class="sd"> invoked by different threads, hence must be made thread-safe, e.g. by using</span>
<span class="sd"> a single lock object.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RestrictionTracker.checkpoint"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.checkpoint">[docs]</a> <span class="k">def</span> <span class="nf">checkpoint</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Performs a checkpoint of the current restriction.</span>
<span class="sd"> Signals that the current ``DoFn.process()`` call should terminate as soon as</span>
<span class="sd"> possible. After this method returns, the tracker MUST refuse all future</span>
<span class="sd"> claim calls, and ``RestrictionTracker.check_done()`` MUST succeed.</span>
<span class="sd"> This invocation modifies the value returned by ``current_restriction()``</span>
<span class="sd"> invocation and returns a restriction representing the rest of the work. The</span>
<span class="sd"> old value of ``current_restriction()`` is equivalent to the new value of</span>
<span class="sd"> ``current_restriction()`` and the return value of this method invocation</span>
<span class="sd"> combined.</span>
<span class="sd"> ** Thread safety **</span>
<span class="sd"> Methods of the class ``RestrictionTracker`` including this method may get</span>
<span class="sd"> invoked by different threads, hence must be made thread-safe, e.g. by using</span>
<span class="sd"> a single lock object.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="RestrictionTracker.check_done"><a class="viewcode-back" href="../../../apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker.check_done">[docs]</a> <span class="k">def</span> <span class="nf">check_done</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Checks whether the restriction has been fully processed.</span>
<span class="sd"> Called by the runner after iterator returned by ``DoFn.process()`` has been</span>
<span class="sd"> fully read.</span>
<span class="sd"> This method must raise a `ValueError` if there is still any unclaimed work</span>
<span class="sd"> remaining in the restriction when this method is invoked. Exception raised</span>
<span class="sd"> must have an informative error message.</span>
<span class="sd"> ** Thread safety **</span>
<span class="sd"> Methods of the class ``RestrictionTracker`` including this method may get</span>
<span class="sd"> invoked by different threads, hence must be made thread-safe, e.g. by using</span>
<span class="sd"> a single lock object.</span>
<span class="sd"> Returns: ``True`` if current restriction has been fully processed.</span>
<span class="sd"> Raises:</span>
<span class="sd"> ~exceptions.ValueError: if there is still any unclaimed work remaining.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div>
</pre></div>
</div>
<div class="articleComments">
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright .
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
var DOCUMENTATION_OPTIONS = {
URL_ROOT:'../../../',
VERSION:'',
COLLAPSE_INDEX:false,
FILE_SUFFIX:'.html',
HAS_SOURCE: true,
SOURCELINK_SUFFIX: '.txt'
};
</script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.StickyNav.enable();
});
</script>
</body>
</html>