blob: a301a25c2dbc65720e322850dfb224ff11d6eb4f [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.transforms.core &mdash; Apache Beam documentation</title>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="index" title="Index"
href="../../../genindex.html"/>
<link rel="search" title="Search" href="../../../search.html"/>
<link rel="top" title="Apache Beam documentation" href="../../../index.html"/>
<link rel="up" title="Module code" href="../../index.html"/>
<script src="../../../_static/js/modernizr.min.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search">
<a href="../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.internal.html">apache_beam.internal package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.tools.html">apache_beam.tools package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.version.html">apache_beam.version module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.transforms.core</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.transforms.core</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Core PTransform subclasses, such as FlatMap, GroupByKey, and Map.&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">absolute_import</span>
<span class="kn">import</span> <span class="nn">copy</span>
<span class="kn">import</span> <span class="nn">inspect</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">random</span>
<span class="kn">import</span> <span class="nn">re</span>
<span class="kn">import</span> <span class="nn">types</span>
<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">map</span>
<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">object</span>
<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">range</span>
<span class="kn">from</span> <span class="nn">future.builtins</span> <span class="k">import</span> <span class="nb">filter</span>
<span class="kn">from</span> <span class="nn">past.builtins</span> <span class="k">import</span> <span class="n">unicode</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="k">import</span> <span class="n">coders</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="k">import</span> <span class="n">pvalue</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="k">import</span> <span class="n">typehints</span>
<span class="kn">from</span> <span class="nn">apache_beam.coders</span> <span class="k">import</span> <span class="n">typecoders</span>
<span class="kn">from</span> <span class="nn">apache_beam.internal</span> <span class="k">import</span> <span class="n">pickler</span>
<span class="kn">from</span> <span class="nn">apache_beam.internal</span> <span class="k">import</span> <span class="n">util</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="k">import</span> <span class="n">TypeOptions</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability</span> <span class="k">import</span> <span class="n">common_urns</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability</span> <span class="k">import</span> <span class="n">python_urns</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability.api</span> <span class="k">import</span> <span class="n">beam_runner_api_pb2</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="k">import</span> <span class="n">ptransform</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="k">import</span> <span class="n">userstate</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="k">import</span> <span class="n">DisplayDataItem</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="k">import</span> <span class="n">HasDisplayData</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.ptransform</span> <span class="k">import</span> <span class="n">PTransform</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.ptransform</span> <span class="k">import</span> <span class="n">PTransformWithSideInputs</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.userstate</span> <span class="k">import</span> <span class="n">StateSpec</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.userstate</span> <span class="k">import</span> <span class="n">TimerSpec</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="k">import</span> <span class="n">GlobalWindows</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="k">import</span> <span class="n">TimestampCombiner</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="k">import</span> <span class="n">TimestampedValue</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="k">import</span> <span class="n">WindowedValue</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="k">import</span> <span class="n">WindowFn</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="k">import</span> <span class="n">KV</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="k">import</span> <span class="n">Any</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="k">import</span> <span class="n">Iterable</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="k">import</span> <span class="n">Union</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="k">import</span> <span class="n">trivial_inference</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints.decorators</span> <span class="k">import</span> <span class="n">TypeCheckError</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints.decorators</span> <span class="k">import</span> <span class="n">WithTypeHints</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints.decorators</span> <span class="k">import</span> <span class="n">get_type_hints</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints.trivial_inference</span> <span class="k">import</span> <span class="n">element_type</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints.typehints</span> <span class="k">import</span> <span class="n">is_consistent_with</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils</span> <span class="k">import</span> <span class="n">urns</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span>
<span class="s1">&#39;DoFn&#39;</span><span class="p">,</span>
<span class="s1">&#39;CombineFn&#39;</span><span class="p">,</span>
<span class="s1">&#39;PartitionFn&#39;</span><span class="p">,</span>
<span class="s1">&#39;ParDo&#39;</span><span class="p">,</span>
<span class="s1">&#39;FlatMap&#39;</span><span class="p">,</span>
<span class="s1">&#39;Map&#39;</span><span class="p">,</span>
<span class="s1">&#39;Filter&#39;</span><span class="p">,</span>
<span class="s1">&#39;CombineGlobally&#39;</span><span class="p">,</span>
<span class="s1">&#39;CombinePerKey&#39;</span><span class="p">,</span>
<span class="s1">&#39;CombineValues&#39;</span><span class="p">,</span>
<span class="s1">&#39;GroupByKey&#39;</span><span class="p">,</span>
<span class="s1">&#39;Partition&#39;</span><span class="p">,</span>
<span class="s1">&#39;Windowing&#39;</span><span class="p">,</span>
<span class="s1">&#39;WindowInto&#39;</span><span class="p">,</span>
<span class="s1">&#39;Flatten&#39;</span><span class="p">,</span>
<span class="s1">&#39;Create&#39;</span><span class="p">,</span>
<span class="s1">&#39;Impulse&#39;</span><span class="p">,</span>
<span class="p">]</span>
<span class="c1"># Type variables</span>
<span class="n">T</span> <span class="o">=</span> <span class="n">typehints</span><span class="o">.</span><span class="n">TypeVariable</span><span class="p">(</span><span class="s1">&#39;T&#39;</span><span class="p">)</span>
<span class="n">K</span> <span class="o">=</span> <span class="n">typehints</span><span class="o">.</span><span class="n">TypeVariable</span><span class="p">(</span><span class="s1">&#39;K&#39;</span><span class="p">)</span>
<span class="n">V</span> <span class="o">=</span> <span class="n">typehints</span><span class="o">.</span><span class="n">TypeVariable</span><span class="p">(</span><span class="s1">&#39;V&#39;</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">DoFnContext</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A context available to all methods of DoFn instance.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="k">class</span> <span class="nc">DoFnProcessContext</span><span class="p">(</span><span class="n">DoFnContext</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A processing context passed to DoFn process() during execution.</span>
<span class="sd"> Experimental; no backwards-compatibility guarantees.</span>
<span class="sd"> Most importantly, a DoFn.process method will access context.element</span>
<span class="sd"> to get the element it is supposed to process.</span>
<span class="sd"> Attributes:</span>
<span class="sd"> label: label of the ParDo whose element is being processed.</span>
<span class="sd"> element: element being processed</span>
<span class="sd"> (in process method only; always None in start_bundle and finish_bundle)</span>
<span class="sd"> timestamp: timestamp of the element</span>
<span class="sd"> (in process method only; always None in start_bundle and finish_bundle)</span>
<span class="sd"> windows: windows of the element</span>
<span class="sd"> (in process method only; always None in start_bundle and finish_bundle)</span>
<span class="sd"> state: a DoFnState object, which holds the runner&#39;s internal state</span>
<span class="sd"> for this element.</span>
<span class="sd"> Not used by the pipeline code.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">label</span><span class="p">,</span> <span class="n">element</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">state</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initialize a processing context object with an element and state.</span>
<span class="sd"> The element represents one value from a PCollection that will be accessed</span>
<span class="sd"> by a DoFn object during pipeline execution, and state is an arbitrary object</span>
<span class="sd"> where counters and other pipeline state information can be passed in.</span>
<span class="sd"> DoFnProcessContext objects are also used as inputs to PartitionFn instances.</span>
<span class="sd"> Args:</span>
<span class="sd"> label: label of the PCollection whose element is being processed.</span>
<span class="sd"> element: element of a PCollection being processed using this context.</span>
<span class="sd"> state: a DoFnState object with state to be passed in to the DoFn object.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">label</span> <span class="o">=</span> <span class="n">label</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">state</span>
<span class="k">if</span> <span class="n">element</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">set_element</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">set_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">windowed_value</span><span class="p">):</span>
<span class="k">if</span> <span class="n">windowed_value</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># Not currently processing an element.</span>
<span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;element&#39;</span><span class="p">):</span>
<span class="k">del</span> <span class="bp">self</span><span class="o">.</span><span class="n">element</span>
<span class="k">del</span> <span class="bp">self</span><span class="o">.</span><span class="n">timestamp</span>
<span class="k">del</span> <span class="bp">self</span><span class="o">.</span><span class="n">windows</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">element</span> <span class="o">=</span> <span class="n">windowed_value</span><span class="o">.</span><span class="n">value</span>
<span class="bp">self</span><span class="o">.</span><span class="n">timestamp</span> <span class="o">=</span> <span class="n">windowed_value</span><span class="o">.</span><span class="n">timestamp</span>
<span class="bp">self</span><span class="o">.</span><span class="n">windows</span> <span class="o">=</span> <span class="n">windowed_value</span><span class="o">.</span><span class="n">windows</span>
<span class="k">class</span> <span class="nc">ProcessContinuation</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;An object that may be produced as the last element of a process method</span>
<span class="sd"> invocation.</span>
<span class="sd"> Experimental; no backwards-compatibility guarantees.</span>
<span class="sd"> If produced, indicates that there is more work to be done for the current</span>
<span class="sd"> input element.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">resume_delay</span><span class="o">=</span><span class="mi">0</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initializes a ProcessContinuation object.</span>
<span class="sd"> Args:</span>
<span class="sd"> resume_delay: indicates the minimum time, in seconds, that should elapse</span>
<span class="sd"> before re-invoking process() method for resuming the invocation of the</span>
<span class="sd"> current element.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">resume_delay</span> <span class="o">=</span> <span class="n">resume_delay</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">resume</span><span class="p">(</span><span class="n">resume_delay</span><span class="o">=</span><span class="mi">0</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A convenient method that produces a ``ProcessContinuation``.</span>
<span class="sd"> Args:</span>
<span class="sd"> resume_delay: delay after which processing current element should be</span>
<span class="sd"> resumed.</span>
<span class="sd"> Returns: a ``ProcessContinuation`` for signalling the runner that current</span>
<span class="sd"> input element has not been fully processed and should be resumed later.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">ProcessContinuation</span><span class="p">(</span><span class="n">resume_delay</span><span class="o">=</span><span class="n">resume_delay</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">RestrictionProvider</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Provides methods for generating and manipulating restrictions.</span>
<span class="sd"> This class should be implemented to support Splittable ``DoFn``s in Python</span>
<span class="sd"> SDK. See https://s.apache.org/splittable-do-fn for more details about</span>
<span class="sd"> Splittable ``DoFn``s.</span>
<span class="sd"> To denote a ``DoFn`` class to be Splittable ``DoFn``, ``DoFn.process()``</span>
<span class="sd"> method of that class should have exactly one parameter whose default value is</span>
<span class="sd"> an instance of ``RestrictionProvider``.</span>
<span class="sd"> The provided ``RestrictionProvider`` instance must provide suitable overrides</span>
<span class="sd"> for the following methods.</span>
<span class="sd"> * create_tracker()</span>
<span class="sd"> * initial_restriction()</span>
<span class="sd"> Optionally, ``RestrictionProvider`` may override default implementations of</span>
<span class="sd"> following methods.</span>
<span class="sd"> * restriction_coder()</span>
<span class="sd"> * split()</span>
<span class="sd"> ** Pausing and resuming processing of an element **</span>
<span class="sd"> As the last element produced by the iterator returned by the</span>
<span class="sd"> ``DoFn.process()`` method, a Splittable ``DoFn`` may return an object of type</span>
<span class="sd"> ``ProcessContinuation``.</span>
<span class="sd"> If provided, ``ProcessContinuation`` object specifies that runner should</span>
<span class="sd"> later re-invoke ``DoFn.process()`` method to resume processing the current</span>
<span class="sd"> element and the manner in which the re-invocation should be performed. A</span>
<span class="sd"> ``ProcessContinuation`` object must only be specified as the last element of</span>
<span class="sd"> the iterator. If a ``ProcessContinuation`` object is not provided the runner</span>
<span class="sd"> will assume that the current input element has been fully processed.</span>
<span class="sd"> ** Updating output watermark **</span>
<span class="sd"> ``DoFn.process()`` method of Splittable ``DoFn``s could contain a parameter</span>
<span class="sd"> with default value ``DoFn.WatermarkReporterParam``. If specified this asks the</span>
<span class="sd"> runner to provide a function that can be used to give the runner a</span>
<span class="sd"> (best-effort) lower bound about the timestamps of future output associated</span>
<span class="sd"> with the current element processed by the ``DoFn``. If the ``DoFn`` has</span>
<span class="sd"> multiple outputs, the watermark applies to all of them. Provided function must</span>
<span class="sd"> be invoked with a single parameter of type ``Timestamp`` or as an integer that</span>
<span class="sd"> gives the watermark in number of seconds.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">create_tracker</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Produces a new ``RestrictionTracker`` for the given restriction.</span>
<span class="sd"> Args:</span>
<span class="sd"> restriction: an object that defines a restriction as identified by a</span>
<span class="sd"> Splittable ``DoFn`` that utilizes the current ``RestrictionProvider``.</span>
<span class="sd"> For example, a tuple that gives a range of positions for a Splittable</span>
<span class="sd"> ``DoFn`` that reads files based on byte positions.</span>
<span class="sd"> Returns: an object of type ``RestrictionTracker``.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span>
<span class="k">def</span> <span class="nf">initial_restriction</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Produces an initial restriction for the given element.&quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span>
<span class="k">def</span> <span class="nf">split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Splits the given element and restriction.</span>
<span class="sd"> Returns an iterator of restrictions. The total set of elements produced by</span>
<span class="sd"> reading input element for each of the returned restrictions should be the</span>
<span class="sd"> same as the total set of elements produced by reading the input element for</span>
<span class="sd"> the input restriction.</span>
<span class="sd"> TODO(chamikara): give suitable hints for performing splitting, for example</span>
<span class="sd"> number of parts or size in bytes.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">yield</span> <span class="n">restriction</span>
<span class="k">def</span> <span class="nf">restriction_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns a ``Coder`` for restrictions.</span>
<span class="sd"> Returned``Coder`` will be used for the restrictions produced by the current</span>
<span class="sd"> ``RestrictionProvider``.</span>
<span class="sd"> Returns:</span>
<span class="sd"> an object of type ``Coder``.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="nb">object</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">get_function_arguments</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">func</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Return the function arguments based on the name provided. If they have</span>
<span class="sd"> a _inspect_function attached to the class then use that otherwise default</span>
<span class="sd"> to the python inspect library.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">func_name</span> <span class="o">=</span> <span class="s1">&#39;_inspect_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">func</span>
<span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">func_name</span><span class="p">):</span>
<span class="n">f</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">func_name</span><span class="p">)</span>
<span class="k">return</span> <span class="n">f</span><span class="p">()</span>
<span class="n">f</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">func</span><span class="p">)</span>
<span class="k">return</span> <span class="n">inspect</span><span class="o">.</span><span class="n">getargspec</span><span class="p">(</span><span class="n">f</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_DoFnParam</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;DoFn parameter.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">param_id</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">param_id</span> <span class="o">=</span> <span class="n">param_id</span>
<span class="k">def</span> <span class="nf">__eq__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">==</span> <span class="nb">type</span><span class="p">(</span><span class="n">other</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">param_id</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">param_id</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">def</span> <span class="nf">__hash__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">hash</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">param_id</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">param_id</span>
<span class="k">class</span> <span class="nc">_StateDoFnParam</span><span class="p">(</span><span class="n">_DoFnParam</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;State DoFn parameter.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state_spec</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">state_spec</span><span class="p">,</span> <span class="n">StateSpec</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;DoFn.StateParam expected StateSpec object.&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state_spec</span> <span class="o">=</span> <span class="n">state_spec</span>
<span class="bp">self</span><span class="o">.</span><span class="n">param_id</span> <span class="o">=</span> <span class="s1">&#39;StateParam(</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="n">state_spec</span><span class="o">.</span><span class="n">name</span>
<span class="k">class</span> <span class="nc">_TimerDoFnParam</span><span class="p">(</span><span class="n">_DoFnParam</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Timer DoFn parameter.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timer_spec</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">timer_spec</span><span class="p">,</span> <span class="n">TimerSpec</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;DoFn.TimerParam expected TimerSpec object.&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">timer_spec</span> <span class="o">=</span> <span class="n">timer_spec</span>
<span class="bp">self</span><span class="o">.</span><span class="n">param_id</span> <span class="o">=</span> <span class="s1">&#39;TimerParam(</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="n">timer_spec</span><span class="o">.</span><span class="n">name</span>
<div class="viewcode-block" id="DoFn"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn">[docs]</a><span class="k">class</span> <span class="nc">DoFn</span><span class="p">(</span><span class="n">WithTypeHints</span><span class="p">,</span> <span class="n">HasDisplayData</span><span class="p">,</span> <span class="n">urns</span><span class="o">.</span><span class="n">RunnerApiFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A function object used by a transform with custom processing.</span>
<span class="sd"> The ParDo transform is such a transform. The ParDo.apply</span>
<span class="sd"> method will take an object of type DoFn and apply it to all elements of a</span>
<span class="sd"> PCollection object.</span>
<span class="sd"> In order to have concrete DoFn objects one has to subclass from DoFn and</span>
<span class="sd"> define the desired behavior (start_bundle/finish_bundle and process) or wrap a</span>
<span class="sd"> callable object using the CallableWrapperDoFn class.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Parameters that can be used in the .process() method.</span>
<span class="n">ElementParam</span> <span class="o">=</span> <span class="n">_DoFnParam</span><span class="p">(</span><span class="s1">&#39;ElementParam&#39;</span><span class="p">)</span>
<span class="n">SideInputParam</span> <span class="o">=</span> <span class="n">_DoFnParam</span><span class="p">(</span><span class="s1">&#39;SideInputParam&#39;</span><span class="p">)</span>
<span class="n">TimestampParam</span> <span class="o">=</span> <span class="n">_DoFnParam</span><span class="p">(</span><span class="s1">&#39;TimestampParam&#39;</span><span class="p">)</span>
<span class="n">WindowParam</span> <span class="o">=</span> <span class="n">_DoFnParam</span><span class="p">(</span><span class="s1">&#39;WindowParam&#39;</span><span class="p">)</span>
<span class="n">WatermarkReporterParam</span> <span class="o">=</span> <span class="n">_DoFnParam</span><span class="p">(</span><span class="s1">&#39;WatermarkReporterParam&#39;</span><span class="p">)</span>
<span class="n">DoFnProcessParams</span> <span class="o">=</span> <span class="p">[</span><span class="n">ElementParam</span><span class="p">,</span> <span class="n">SideInputParam</span><span class="p">,</span> <span class="n">TimestampParam</span><span class="p">,</span>
<span class="n">WindowParam</span><span class="p">,</span> <span class="n">WatermarkReporterParam</span><span class="p">]</span>
<span class="c1"># Parameters to access state and timers. Not restricted to use only in the</span>
<span class="c1"># .process() method. Usage: DoFn.StateParam(state_spec),</span>
<span class="c1"># DoFn.TimerParam(timer_spec).</span>
<span class="n">StateParam</span> <span class="o">=</span> <span class="n">_StateDoFnParam</span>
<span class="n">TimerParam</span> <span class="o">=</span> <span class="n">_TimerDoFnParam</span>
<div class="viewcode-block" id="DoFn.from_callable"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.from_callable">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">from_callable</span><span class="p">(</span><span class="n">fn</span><span class="p">):</span>
<span class="k">return</span> <span class="n">CallableWrapperDoFn</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span></div>
<div class="viewcode-block" id="DoFn.default_label"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.default_label">[docs]</a> <span class="k">def</span> <span class="nf">default_label</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span></div>
<div class="viewcode-block" id="DoFn.process"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Method to use for processing elements.</span>
<span class="sd"> This is invoked by ``DoFnRunner`` for each element of a input</span>
<span class="sd"> ``PCollection``.</span>
<span class="sd"> If specified, following default arguments are used by the ``DoFnRunner`` to</span>
<span class="sd"> be able to pass the parameters correctly.</span>
<span class="sd"> ``DoFn.ElementParam``: element to be processed.</span>
<span class="sd"> ``DoFn.SideInputParam``: a side input that may be used when processing.</span>
<span class="sd"> ``DoFn.TimestampParam``: timestamp of the input element.</span>
<span class="sd"> ``DoFn.WindowParam``: ``Window`` the input element belongs to.</span>
<span class="sd"> A ``RestrictionProvider`` instance: an ``iobase.RestrictionTracker`` will be</span>
<span class="sd"> provided here to allow treatment as a Splittable `DoFn``.</span>
<span class="sd"> ``DoFn.WatermarkReporterParam``: a function that can be used to report</span>
<span class="sd"> output watermark of Splittable ``DoFn`` implementations.</span>
<span class="sd"> Args:</span>
<span class="sd"> element: The element to be processed</span>
<span class="sd"> *args: side inputs</span>
<span class="sd"> **kwargs: other keyword arguments.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="DoFn.start_bundle"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.start_bundle">[docs]</a> <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Called before a bundle of elements is processed on a worker.</span>
<span class="sd"> Elements to be processed are split into bundles and distributed</span>
<span class="sd"> to workers. Before a worker calls process() on the first element</span>
<span class="sd"> of its bundle, it calls this method.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">pass</span></div>
<div class="viewcode-block" id="DoFn.finish_bundle"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.finish_bundle">[docs]</a> <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Called after a bundle of elements is processed on a worker.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">pass</span></div>
<div class="viewcode-block" id="DoFn.get_function_arguments"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.get_function_arguments">[docs]</a> <span class="k">def</span> <span class="nf">get_function_arguments</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">func</span><span class="p">):</span>
<span class="k">return</span> <span class="n">get_function_arguments</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">func</span><span class="p">)</span></div>
<span class="c1"># TODO(sourabhbajaj): Do we want to remove the responsibility of these from</span>
<span class="c1"># the DoFn or maybe the runner</span>
<div class="viewcode-block" id="DoFn.infer_output_type"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.infer_output_type">[docs]</a> <span class="k">def</span> <span class="nf">infer_output_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="p">):</span>
<span class="c1"># TODO(robertwb): Side inputs types.</span>
<span class="c1"># TODO(robertwb): Assert compatibility with input type hint?</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_strip_output_annotations</span><span class="p">(</span>
<span class="n">trivial_inference</span><span class="o">.</span><span class="n">infer_return_type</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">process</span><span class="p">,</span> <span class="p">[</span><span class="n">input_type</span><span class="p">]))</span></div>
<span class="k">def</span> <span class="nf">_strip_output_annotations</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">type_hint</span><span class="p">):</span>
<span class="n">annotations</span> <span class="o">=</span> <span class="p">(</span><span class="n">TimestampedValue</span><span class="p">,</span> <span class="n">WindowedValue</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">)</span>
<span class="c1"># TODO(robertwb): These should be parameterized types that the</span>
<span class="c1"># type inferencer understands.</span>
<span class="k">if</span> <span class="p">(</span><span class="n">type_hint</span> <span class="ow">in</span> <span class="n">annotations</span>
<span class="ow">or</span> <span class="n">trivial_inference</span><span class="o">.</span><span class="n">element_type</span><span class="p">(</span><span class="n">type_hint</span><span class="p">)</span> <span class="ow">in</span> <span class="n">annotations</span><span class="p">):</span>
<span class="k">return</span> <span class="n">Any</span>
<span class="k">return</span> <span class="n">type_hint</span>
<span class="k">def</span> <span class="nf">_process_argspec_fn</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns the Python callable that will eventually be invoked.</span>
<span class="sd"> This should ideally be the user-level function that is called with</span>
<span class="sd"> the main and (if any) side inputs, and is used to relate the type</span>
<span class="sd"> hint parameters with the input parameters (e.g., by argument name).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">process</span>
<div class="viewcode-block" id="DoFn.is_process_bounded"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.is_process_bounded">[docs]</a> <span class="k">def</span> <span class="nf">is_process_bounded</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Checks if an object is a bound method on an instance.&quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">process</span><span class="p">,</span> <span class="n">types</span><span class="o">.</span><span class="n">MethodType</span><span class="p">):</span>
<span class="k">return</span> <span class="kc">False</span> <span class="c1"># Not a method</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">process</span><span class="o">.</span><span class="vm">__self__</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span> <span class="c1"># Method is not bound</span>
<span class="k">if</span> <span class="nb">issubclass</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">process</span><span class="o">.</span><span class="vm">__self__</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span> <span class="nb">type</span><span class="p">)</span> <span class="ow">or</span> \
<span class="bp">self</span><span class="o">.</span><span class="n">process</span><span class="o">.</span><span class="vm">__self__</span><span class="o">.</span><span class="vm">__class__</span> <span class="ow">is</span> <span class="nb">type</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span> <span class="c1"># Method is a classmethod</span>
<span class="k">return</span> <span class="kc">True</span></div>
<span class="n">urns</span><span class="o">.</span><span class="n">RunnerApiFn</span><span class="o">.</span><span class="n">register_pickle_urn</span><span class="p">(</span><span class="n">python_urns</span><span class="o">.</span><span class="n">PICKLED_DOFN</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">_fn_takes_side_inputs</span><span class="p">(</span><span class="n">fn</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">argspec</span> <span class="o">=</span> <span class="n">inspect</span><span class="o">.</span><span class="n">getargspec</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="c1"># We can&#39;t tell; maybe it does.</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="n">is_bound</span> <span class="o">=</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="n">types</span><span class="o">.</span><span class="n">MethodType</span><span class="p">)</span> <span class="ow">and</span> <span class="n">fn</span><span class="o">.</span><span class="vm">__self__</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="n">argspec</span><span class="o">.</span><span class="n">args</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">1</span> <span class="o">+</span> <span class="n">is_bound</span> <span class="ow">or</span> <span class="n">argspec</span><span class="o">.</span><span class="n">varargs</span> <span class="ow">or</span> <span class="n">argspec</span><span class="o">.</span><span class="n">keywords</span>
<span class="k">class</span> <span class="nc">CallableWrapperDoFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;For internal use only; no backwards-compatibility guarantees.</span>
<span class="sd"> A DoFn (function) object wrapping a callable object.</span>
<span class="sd"> The purpose of this class is to conveniently wrap simple functions and use</span>
<span class="sd"> them in transforms.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initializes a CallableWrapperDoFn object wrapping a callable.</span>
<span class="sd"> Args:</span>
<span class="sd"> fn: A callable object.</span>
<span class="sd"> Raises:</span>
<span class="sd"> TypeError: if fn parameter is not a callable type.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">callable</span><span class="p">(</span><span class="n">fn</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">&#39;Expected a callable object instead of: </span><span class="si">%r</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">fn</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_fn</span> <span class="o">=</span> <span class="n">fn</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="p">(</span>
<span class="n">types</span><span class="o">.</span><span class="n">BuiltinFunctionType</span><span class="p">,</span> <span class="n">types</span><span class="o">.</span><span class="n">MethodType</span><span class="p">,</span> <span class="n">types</span><span class="o">.</span><span class="n">FunctionType</span><span class="p">)):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">process</span> <span class="o">=</span> <span class="n">fn</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># For cases such as set / list where fn is callable but not a function</span>
<span class="bp">self</span><span class="o">.</span><span class="n">process</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">element</span><span class="p">:</span> <span class="n">fn</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="nb">super</span><span class="p">(</span><span class="n">CallableWrapperDoFn</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># If the callable has a name, then it&#39;s likely a function, and</span>
<span class="c1"># we show its name.</span>
<span class="c1"># Otherwise, it might be an instance of a callable class. We</span>
<span class="c1"># show its class.</span>
<span class="n">display_data_value</span> <span class="o">=</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="o">.</span><span class="vm">__name__</span> <span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">,</span> <span class="s1">&#39;__name__&#39;</span><span class="p">)</span>
<span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="o">.</span><span class="vm">__class__</span><span class="p">)</span>
<span class="k">return</span> <span class="p">{</span><span class="s1">&#39;fn&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="n">display_data_value</span><span class="p">,</span>
<span class="n">label</span><span class="o">=</span><span class="s1">&#39;Transform Function&#39;</span><span class="p">)}</span>
<span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s1">&#39;CallableWrapperDoFn(</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span>
<span class="k">def</span> <span class="nf">default_type_hints</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">type_hints</span> <span class="o">=</span> <span class="n">get_type_hints</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">)</span>
<span class="c1"># If the fn was a DoFn annotated with a type-hint that hinted a return</span>
<span class="c1"># type compatible with Iterable[Any], then we strip off the outer</span>
<span class="c1"># container type due to the &#39;flatten&#39; portion of FlatMap.</span>
<span class="c1"># TODO(robertwb): Should we require an iterable specification for FlatMap?</span>
<span class="k">if</span> <span class="n">type_hints</span><span class="o">.</span><span class="n">output_types</span><span class="p">:</span>
<span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span> <span class="o">=</span> <span class="n">type_hints</span><span class="o">.</span><span class="n">output_types</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">args</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span> <span class="ow">and</span> <span class="n">is_consistent_with</span><span class="p">(</span><span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">Any</span><span class="p">]):</span>
<span class="n">type_hints</span> <span class="o">=</span> <span class="n">type_hints</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="n">type_hints</span><span class="o">.</span><span class="n">set_output_types</span><span class="p">(</span><span class="n">element_type</span><span class="p">(</span><span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">]),</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">return</span> <span class="n">type_hints</span>
<span class="k">def</span> <span class="nf">infer_output_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_strip_output_annotations</span><span class="p">(</span>
<span class="n">trivial_inference</span><span class="o">.</span><span class="n">infer_return_type</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">,</span> <span class="p">[</span><span class="n">input_type</span><span class="p">]))</span>
<span class="k">def</span> <span class="nf">_process_argspec_fn</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">,</span> <span class="s1">&#39;_argspec_fn&#39;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_inspect_process</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="n">inspect</span><span class="o">.</span><span class="n">getargspec</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_process_argspec_fn</span><span class="p">())</span>
<div class="viewcode-block" id="CombineFn"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn">[docs]</a><span class="k">class</span> <span class="nc">CombineFn</span><span class="p">(</span><span class="n">WithTypeHints</span><span class="p">,</span> <span class="n">HasDisplayData</span><span class="p">,</span> <span class="n">urns</span><span class="o">.</span><span class="n">RunnerApiFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A function object used by a Combine transform with custom processing.</span>
<span class="sd"> A CombineFn specifies how multiple values in all or part of a PCollection can</span>
<span class="sd"> be merged into a single value---essentially providing the same kind of</span>
<span class="sd"> information as the arguments to the Python &quot;reduce&quot; builtin (except for the</span>
<span class="sd"> input argument, which is an instance of CombineFnProcessContext). The</span>
<span class="sd"> combining process proceeds as follows:</span>
<span class="sd"> 1. Input values are partitioned into one or more batches.</span>
<span class="sd"> 2. For each batch, the create_accumulator method is invoked to create a fresh</span>
<span class="sd"> initial &quot;accumulator&quot; value representing the combination of zero values.</span>
<span class="sd"> 3. For each input value in the batch, the add_input method is invoked to</span>
<span class="sd"> combine more values with the accumulator for that batch.</span>
<span class="sd"> 4. The merge_accumulators method is invoked to combine accumulators from</span>
<span class="sd"> separate batches into a single combined output accumulator value, once all</span>
<span class="sd"> of the accumulators have had all the input value in their batches added to</span>
<span class="sd"> them. This operation is invoked repeatedly, until there is only one</span>
<span class="sd"> accumulator value left.</span>
<span class="sd"> 5. The extract_output operation is invoked on the final accumulator to get</span>
<span class="sd"> the output value.</span>
<span class="sd"> Note: If this **CombineFn** is used with a transform that has defaults,</span>
<span class="sd"> **apply** will be called with an empty list at expansion time to get the</span>
<span class="sd"> default value.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="CombineFn.default_label"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.default_label">[docs]</a> <span class="k">def</span> <span class="nf">default_label</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span></div>
<div class="viewcode-block" id="CombineFn.create_accumulator"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.create_accumulator">[docs]</a> <span class="k">def</span> <span class="nf">create_accumulator</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Return a fresh, empty accumulator for the combine operation.</span>
<span class="sd"> Args:</span>
<span class="sd"> *args: Additional arguments and side inputs.</span>
<span class="sd"> **kwargs: Additional arguments and side inputs.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div>
<div class="viewcode-block" id="CombineFn.add_input"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.add_input">[docs]</a> <span class="k">def</span> <span class="nf">add_input</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">accumulator</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Return result of folding element into accumulator.</span>
<span class="sd"> CombineFn implementors must override add_input.</span>
<span class="sd"> Args:</span>
<span class="sd"> accumulator: the current accumulator</span>
<span class="sd"> element: the element to add</span>
<span class="sd"> *args: Additional arguments and side inputs.</span>
<span class="sd"> **kwargs: Additional arguments and side inputs.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div>
<div class="viewcode-block" id="CombineFn.add_inputs"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.add_inputs">[docs]</a> <span class="k">def</span> <span class="nf">add_inputs</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">accumulator</span><span class="p">,</span> <span class="n">elements</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns the result of folding each element in elements into accumulator.</span>
<span class="sd"> This is provided in case the implementation affords more efficient</span>
<span class="sd"> bulk addition of elements. The default implementation simply loops</span>
<span class="sd"> over the inputs invoking add_input for each one.</span>
<span class="sd"> Args:</span>
<span class="sd"> accumulator: the current accumulator</span>
<span class="sd"> elements: the elements to add</span>
<span class="sd"> *args: Additional arguments and side inputs.</span>
<span class="sd"> **kwargs: Additional arguments and side inputs.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">for</span> <span class="n">element</span> <span class="ow">in</span> <span class="n">elements</span><span class="p">:</span>
<span class="n">accumulator</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">add_input</span><span class="p">(</span><span class="n">accumulator</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">return</span> <span class="n">accumulator</span></div>
<div class="viewcode-block" id="CombineFn.merge_accumulators"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.merge_accumulators">[docs]</a> <span class="k">def</span> <span class="nf">merge_accumulators</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">accumulators</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns the result of merging several accumulators</span>
<span class="sd"> to a single accumulator value.</span>
<span class="sd"> Args:</span>
<span class="sd"> accumulators: the accumulators to merge</span>
<span class="sd"> *args: Additional arguments and side inputs.</span>
<span class="sd"> **kwargs: Additional arguments and side inputs.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div>
<div class="viewcode-block" id="CombineFn.extract_output"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.extract_output">[docs]</a> <span class="k">def</span> <span class="nf">extract_output</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">accumulator</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Return result of converting accumulator into the output value.</span>
<span class="sd"> Args:</span>
<span class="sd"> accumulator: the final accumulator value computed by this CombineFn</span>
<span class="sd"> for the entire input key or PCollection.</span>
<span class="sd"> *args: Additional arguments and side inputs.</span>
<span class="sd"> **kwargs: Additional arguments and side inputs.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div>
<div class="viewcode-block" id="CombineFn.apply"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.apply">[docs]</a> <span class="k">def</span> <span class="nf">apply</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">elements</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns result of applying this CombineFn to the input values.</span>
<span class="sd"> Args:</span>
<span class="sd"> elements: the set of values to combine.</span>
<span class="sd"> *args: Additional arguments and side inputs.</span>
<span class="sd"> **kwargs: Additional arguments and side inputs.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">extract_output</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">add_inputs</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">create_accumulator</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">),</span> <span class="n">elements</span><span class="p">,</span>
<span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">),</span>
<span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span></div>
<div class="viewcode-block" id="CombineFn.for_input_type"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.for_input_type">[docs]</a> <span class="k">def</span> <span class="nf">for_input_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns a specialized implementation of self, if it exists.</span>
<span class="sd"> Otherwise, returns self.</span>
<span class="sd"> Args:</span>
<span class="sd"> input_type: the type of input elements.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="CombineFn.from_callable"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.from_callable">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">from_callable</span><span class="p">(</span><span class="n">fn</span><span class="p">):</span>
<span class="k">return</span> <span class="n">CallableWrapperCombineFn</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span></div>
<div class="viewcode-block" id="CombineFn.maybe_from_callable"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.maybe_from_callable">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">maybe_from_callable</span><span class="p">(</span><span class="n">fn</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="n">CombineFn</span><span class="p">):</span>
<span class="k">return</span> <span class="n">fn</span>
<span class="k">elif</span> <span class="n">callable</span><span class="p">(</span><span class="n">fn</span><span class="p">):</span>
<span class="k">return</span> <span class="n">CallableWrapperCombineFn</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">&#39;Expected a CombineFn or callable, got </span><span class="si">%r</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">fn</span><span class="p">)</span></div>
<div class="viewcode-block" id="CombineFn.get_accumulator_coder"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.get_accumulator_coder">[docs]</a> <span class="k">def</span> <span class="nf">get_accumulator_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="nb">object</span><span class="p">)</span></div>
<span class="n">urns</span><span class="o">.</span><span class="n">RunnerApiFn</span><span class="o">.</span><span class="n">register_pickle_urn</span><span class="p">(</span><span class="n">python_urns</span><span class="o">.</span><span class="n">PICKLED_COMBINE_FN</span><span class="p">)</span></div>
<span class="k">class</span> <span class="nc">CallableWrapperCombineFn</span><span class="p">(</span><span class="n">CombineFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;For internal use only; no backwards-compatibility guarantees.</span>
<span class="sd"> A CombineFn (function) object wrapping a callable object.</span>
<span class="sd"> The purpose of this class is to conveniently wrap simple functions and use</span>
<span class="sd"> them in Combine transforms.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">_EMPTY</span> <span class="o">=</span> <span class="nb">object</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initializes a CallableFn object wrapping a callable.</span>
<span class="sd"> Args:</span>
<span class="sd"> fn: A callable object that reduces elements of an iterable to a single</span>
<span class="sd"> value (like the builtins sum and max). This callable must be capable of</span>
<span class="sd"> receiving the kind of values it generates as output in its input, and</span>
<span class="sd"> for best results, its operation must be commutative and associative.</span>
<span class="sd"> Raises:</span>
<span class="sd"> TypeError: if fn parameter is not a callable type.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">callable</span><span class="p">(</span><span class="n">fn</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">&#39;Expected a callable object instead of: </span><span class="si">%r</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">fn</span><span class="p">)</span>
<span class="nb">super</span><span class="p">(</span><span class="n">CallableWrapperCombineFn</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_fn</span> <span class="o">=</span> <span class="n">fn</span>
<span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="s1">&#39;fn_dd&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">}</span>
<span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s2">&quot;CallableWrapperCombineFn(</span><span class="si">%s</span><span class="s2">)&quot;</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span>
<span class="k">def</span> <span class="nf">create_accumulator</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_EMPTY</span>
<span class="k">def</span> <span class="nf">add_input</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">accumulator</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">if</span> <span class="n">accumulator</span> <span class="ow">is</span> <span class="bp">self</span><span class="o">.</span><span class="n">_EMPTY</span><span class="p">:</span>
<span class="k">return</span> <span class="n">element</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">([</span><span class="n">accumulator</span><span class="p">,</span> <span class="n">element</span><span class="p">],</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">add_inputs</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">accumulator</span><span class="p">,</span> <span class="n">elements</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">if</span> <span class="n">accumulator</span> <span class="ow">is</span> <span class="bp">self</span><span class="o">.</span><span class="n">_EMPTY</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">(</span><span class="n">elements</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">elements</span><span class="p">,</span> <span class="p">(</span><span class="nb">list</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">)):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">([</span><span class="n">accumulator</span><span class="p">]</span> <span class="o">+</span> <span class="nb">list</span><span class="p">(</span><span class="n">elements</span><span class="p">),</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">union</span><span class="p">():</span>
<span class="k">yield</span> <span class="n">accumulator</span>
<span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">elements</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">e</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">(</span><span class="n">union</span><span class="p">(),</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">merge_accumulators</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">accumulators</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="n">filter_fn</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span> <span class="ow">is</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_EMPTY</span>
<span class="k">class</span> <span class="nc">ReiterableNonEmptyAccumulators</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">__iter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">filter</span><span class="p">(</span><span class="n">filter_fn</span><span class="p">,</span> <span class="n">accumulators</span><span class="p">)</span>
<span class="c1"># It&#39;s (weakly) assumed that self._fn is associative.</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">(</span><span class="n">ReiterableNonEmptyAccumulators</span><span class="p">(),</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">extract_output</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">accumulator</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">(())</span> <span class="k">if</span> <span class="n">accumulator</span> <span class="ow">is</span> <span class="bp">self</span><span class="o">.</span><span class="n">_EMPTY</span> <span class="k">else</span> <span class="n">accumulator</span>
<span class="k">def</span> <span class="nf">default_type_hints</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">fn_hints</span> <span class="o">=</span> <span class="n">get_type_hints</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">)</span>
<span class="k">if</span> <span class="n">fn_hints</span><span class="o">.</span><span class="n">input_types</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">fn_hints</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># fn(Iterable[V]) -&gt; V becomes CombineFn(V) -&gt; V</span>
<span class="n">input_args</span><span class="p">,</span> <span class="n">input_kwargs</span> <span class="o">=</span> <span class="n">fn_hints</span><span class="o">.</span><span class="n">input_types</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">input_args</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">input_kwargs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="n">input_args</span><span class="p">,</span> <span class="n">input_kwargs</span> <span class="o">=</span> <span class="nb">tuple</span><span class="p">(</span><span class="n">input_kwargs</span><span class="o">.</span><span class="n">values</span><span class="p">()),</span> <span class="p">{}</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">&#39;Combiner input type must be specified positionally.&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">is_consistent_with</span><span class="p">(</span><span class="n">input_args</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">Any</span><span class="p">]):</span>
<span class="k">raise</span> <span class="n">TypeCheckError</span><span class="p">(</span>
<span class="s1">&#39;All functions for a Combine PTransform must accept a &#39;</span>
<span class="s1">&#39;single argument compatible with: Iterable[Any]. &#39;</span>
<span class="s1">&#39;Instead a function with input type: </span><span class="si">%s</span><span class="s1"> was received.&#39;</span>
<span class="o">%</span> <span class="n">input_args</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="n">input_args</span> <span class="o">=</span> <span class="p">(</span><span class="n">element_type</span><span class="p">(</span><span class="n">input_args</span><span class="p">[</span><span class="mi">0</span><span class="p">]),)</span> <span class="o">+</span> <span class="n">input_args</span><span class="p">[</span><span class="mi">1</span><span class="p">:]</span>
<span class="c1"># TODO(robertwb): Assert output type is consistent with input type?</span>
<span class="n">hints</span> <span class="o">=</span> <span class="n">fn_hints</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="n">hints</span><span class="o">.</span><span class="n">set_input_types</span><span class="p">(</span><span class="o">*</span><span class="n">input_args</span><span class="p">,</span> <span class="o">**</span><span class="n">input_kwargs</span><span class="p">)</span>
<span class="k">return</span> <span class="n">hints</span>
<span class="k">def</span> <span class="nf">for_input_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="p">):</span>
<span class="c1"># Avoid circular imports.</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="k">import</span> <span class="n">cy_combiners</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span> <span class="ow">is</span> <span class="nb">any</span><span class="p">:</span>
<span class="k">return</span> <span class="n">cy_combiners</span><span class="o">.</span><span class="n">AnyCombineFn</span><span class="p">()</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span> <span class="ow">is</span> <span class="nb">all</span><span class="p">:</span>
<span class="k">return</span> <span class="n">cy_combiners</span><span class="o">.</span><span class="n">AllCombineFn</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">known_types</span> <span class="o">=</span> <span class="p">{</span>
<span class="p">(</span><span class="nb">sum</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span> <span class="n">cy_combiners</span><span class="o">.</span><span class="n">SumInt64Fn</span><span class="p">(),</span>
<span class="p">(</span><span class="nb">min</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span> <span class="n">cy_combiners</span><span class="o">.</span><span class="n">MinInt64Fn</span><span class="p">(),</span>
<span class="p">(</span><span class="nb">max</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span> <span class="n">cy_combiners</span><span class="o">.</span><span class="n">MaxInt64Fn</span><span class="p">(),</span>
<span class="p">(</span><span class="nb">sum</span><span class="p">,</span> <span class="nb">float</span><span class="p">):</span> <span class="n">cy_combiners</span><span class="o">.</span><span class="n">SumFloatFn</span><span class="p">(),</span>
<span class="p">(</span><span class="nb">min</span><span class="p">,</span> <span class="nb">float</span><span class="p">):</span> <span class="n">cy_combiners</span><span class="o">.</span><span class="n">MinFloatFn</span><span class="p">(),</span>
<span class="p">(</span><span class="nb">max</span><span class="p">,</span> <span class="nb">float</span><span class="p">):</span> <span class="n">cy_combiners</span><span class="o">.</span><span class="n">MaxFloatFn</span><span class="p">(),</span>
<span class="p">}</span>
<span class="k">return</span> <span class="n">known_types</span><span class="o">.</span><span class="n">get</span><span class="p">((</span><span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">,</span> <span class="n">input_type</span><span class="p">),</span> <span class="bp">self</span><span class="p">)</span>
<div class="viewcode-block" id="PartitionFn"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.PartitionFn">[docs]</a><span class="k">class</span> <span class="nc">PartitionFn</span><span class="p">(</span><span class="n">WithTypeHints</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A function object used by a Partition transform.</span>
<span class="sd"> A PartitionFn specifies how individual values in a PCollection will be placed</span>
<span class="sd"> into separate partitions, indexed by an integer.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="PartitionFn.default_label"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.PartitionFn.default_label">[docs]</a> <span class="k">def</span> <span class="nf">default_label</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span></div>
<div class="viewcode-block" id="PartitionFn.partition_for"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.PartitionFn.partition_for">[docs]</a> <span class="k">def</span> <span class="nf">partition_for</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">num_partitions</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Specify which partition will receive this element.</span>
<span class="sd"> Args:</span>
<span class="sd"> element: An element of the input PCollection.</span>
<span class="sd"> num_partitions: Number of partitions, i.e., output PCollections.</span>
<span class="sd"> *args: optional parameters and side inputs.</span>
<span class="sd"> **kwargs: optional parameters and side inputs.</span>
<span class="sd"> Returns:</span>
<span class="sd"> An integer in [0, num_partitions).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">pass</span></div></div>
<span class="k">class</span> <span class="nc">CallableWrapperPartitionFn</span><span class="p">(</span><span class="n">PartitionFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;For internal use only; no backwards-compatibility guarantees.</span>
<span class="sd"> A PartitionFn object wrapping a callable object.</span>
<span class="sd"> Instances of this class wrap simple functions for use in Partition operations.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initializes a PartitionFn object wrapping a callable.</span>
<span class="sd"> Args:</span>
<span class="sd"> fn: A callable object, which should accept the following arguments:</span>
<span class="sd"> element - element to assign to a partition.</span>
<span class="sd"> num_partitions - number of output partitions.</span>
<span class="sd"> and may accept additional arguments and side inputs.</span>
<span class="sd"> Raises:</span>
<span class="sd"> TypeError: if fn is not a callable type.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">callable</span><span class="p">(</span><span class="n">fn</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">&#39;Expected a callable object instead of: </span><span class="si">%r</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">fn</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_fn</span> <span class="o">=</span> <span class="n">fn</span>
<span class="k">def</span> <span class="nf">partition_for</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">num_partitions</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">num_partitions</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<div class="viewcode-block" id="ParDo"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo">[docs]</a><span class="k">class</span> <span class="nc">ParDo</span><span class="p">(</span><span class="n">PTransformWithSideInputs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A :class:`ParDo` transform.</span>
<span class="sd"> Processes an input :class:`~apache_beam.pvalue.PCollection` by applying a</span>
<span class="sd"> :class:`DoFn` to each element and returning the accumulated results into an</span>
<span class="sd"> output :class:`~apache_beam.pvalue.PCollection`. The type of the elements is</span>
<span class="sd"> not fixed as long as the :class:`DoFn` can deal with it. In reality the type</span>
<span class="sd"> is restrained to some extent because the elements sometimes must be persisted</span>
<span class="sd"> to external storage. See the :meth:`.expand()` method comments for a</span>
<span class="sd"> detailed description of all possible arguments.</span>
<span class="sd"> Note that the :class:`DoFn` must return an iterable for each element of the</span>
<span class="sd"> input :class:`~apache_beam.pvalue.PCollection`. An easy way to do this is to</span>
<span class="sd"> use the ``yield`` keyword in the process method.</span>
<span class="sd"> Args:</span>
<span class="sd"> pcoll (~apache_beam.pvalue.PCollection):</span>
<span class="sd"> a :class:`~apache_beam.pvalue.PCollection` to be processed.</span>
<span class="sd"> fn (DoFn): a :class:`DoFn` object to be applied to each element</span>
<span class="sd"> of **pcoll** argument.</span>
<span class="sd"> *args: positional arguments passed to the :class:`DoFn` object.</span>
<span class="sd"> **kwargs: keyword arguments passed to the :class:`DoFn` object.</span>
<span class="sd"> Note that the positional and keyword arguments will be processed in order</span>
<span class="sd"> to detect :class:`~apache_beam.pvalue.PCollection` s that will be computed as</span>
<span class="sd"> side inputs to the transform. During pipeline execution whenever the</span>
<span class="sd"> :class:`DoFn` object gets executed (its :meth:`DoFn.process()` method gets</span>
<span class="sd"> called) the :class:`~apache_beam.pvalue.PCollection` arguments will be</span>
<span class="sd"> replaced by values from the :class:`~apache_beam.pvalue.PCollection` in the</span>
<span class="sd"> exact positions where they appear in the argument lists.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fn</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="nb">super</span><span class="p">(</span><span class="n">ParDo</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="c1"># TODO(robertwb): Change all uses of the dofn attribute to use fn instead.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dofn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">fn</span>
<span class="bp">self</span><span class="o">.</span><span class="n">output_tags</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">,</span> <span class="n">DoFn</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">&#39;ParDo must be called with a DoFn instance.&#39;</span><span class="p">)</span>
<span class="c1"># Validate the DoFn by creating a DoFnSignature</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.common</span> <span class="k">import</span> <span class="n">DoFnSignature</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_signature</span> <span class="o">=</span> <span class="n">DoFnSignature</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">)</span>
<div class="viewcode-block" id="ParDo.default_type_hints"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo.default_type_hints">[docs]</a> <span class="k">def</span> <span class="nf">default_type_hints</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="o">.</span><span class="n">get_type_hints</span><span class="p">()</span></div>
<div class="viewcode-block" id="ParDo.infer_output_type"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo.infer_output_type">[docs]</a> <span class="k">def</span> <span class="nf">infer_output_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="p">):</span>
<span class="k">return</span> <span class="n">trivial_inference</span><span class="o">.</span><span class="n">element_type</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="o">.</span><span class="n">infer_output_type</span><span class="p">(</span><span class="n">input_type</span><span class="p">))</span></div>
<div class="viewcode-block" id="ParDo.make_fn"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo.make_fn">[docs]</a> <span class="k">def</span> <span class="nf">make_fn</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fn</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="n">DoFn</span><span class="p">):</span>
<span class="k">return</span> <span class="n">fn</span>
<span class="k">return</span> <span class="n">CallableWrapperDoFn</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">_process_argspec_fn</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="o">.</span><span class="n">_process_argspec_fn</span><span class="p">()</span>
<div class="viewcode-block" id="ParDo.display_data"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="s1">&#39;fn&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span>
<span class="n">label</span><span class="o">=</span><span class="s1">&#39;Transform Function&#39;</span><span class="p">),</span>
<span class="s1">&#39;fn_dd&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">}</span></div>
<div class="viewcode-block" id="ParDo.expand"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="c1"># In the case of a stateful DoFn, warn if the key coder is not</span>
<span class="c1"># deterministic.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_signature</span><span class="o">.</span><span class="n">is_stateful_dofn</span><span class="p">():</span>
<span class="n">kv_type_hint</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span>
<span class="k">if</span> <span class="n">kv_type_hint</span> <span class="ow">and</span> <span class="n">kv_type_hint</span> <span class="o">!=</span> <span class="n">typehints</span><span class="o">.</span><span class="n">Any</span><span class="p">:</span>
<span class="n">coder</span> <span class="o">=</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="n">kv_type_hint</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">coder</span><span class="o">.</span><span class="n">is_kv_coder</span><span class="p">():</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;Input elements to the transform </span><span class="si">%s</span><span class="s1"> with stateful DoFn must be &#39;</span>
<span class="s1">&#39;key-value pairs.&#39;</span> <span class="o">%</span> <span class="bp">self</span><span class="p">)</span>
<span class="n">key_coder</span> <span class="o">=</span> <span class="n">coder</span><span class="o">.</span><span class="n">key_coder</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">key_coder</span> <span class="o">=</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="n">typehints</span><span class="o">.</span><span class="n">Any</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">key_coder</span><span class="o">.</span><span class="n">is_deterministic</span><span class="p">():</span>
<span class="n">logging</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;Key coder </span><span class="si">%s</span><span class="s1"> for transform </span><span class="si">%s</span><span class="s1"> with stateful DoFn may not &#39;</span>
<span class="s1">&#39;be deterministic. This may cause incorrect behavior for complex &#39;</span>
<span class="s1">&#39;key types. Consider adding an input type hint for this transform.&#39;</span><span class="p">,</span>
<span class="n">key_coder</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span>
<span class="k">return</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span></div>
<div class="viewcode-block" id="ParDo.with_outputs"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo.with_outputs">[docs]</a> <span class="k">def</span> <span class="nf">with_outputs</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">tags</span><span class="p">,</span> <span class="o">**</span><span class="n">main_kw</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns a tagged tuple allowing access to the outputs of a</span>
<span class="sd"> :class:`ParDo`.</span>
<span class="sd"> The resulting object supports access to the</span>
<span class="sd"> :class:`~apache_beam.pvalue.PCollection` associated with a tag</span>
<span class="sd"> (e.g. ``o.tag``, ``o[tag]``) and iterating over the available tags</span>
<span class="sd"> (e.g. ``for tag in o: ...``).</span>
<span class="sd"> Args:</span>
<span class="sd"> *tags: if non-empty, list of valid tags. If a list of valid tags is given,</span>
<span class="sd"> it will be an error to use an undeclared tag later in the pipeline.</span>
<span class="sd"> **main_kw: dictionary empty or with one key ``&#39;main&#39;`` defining the tag to</span>
<span class="sd"> be used for the main output (which will not have a tag associated with</span>
<span class="sd"> it).</span>
<span class="sd"> Returns:</span>
<span class="sd"> ~apache_beam.pvalue.DoOutputsTuple: An object of type</span>
<span class="sd"> :class:`~apache_beam.pvalue.DoOutputsTuple` that bundles together all</span>
<span class="sd"> the outputs of a :class:`ParDo` transform and allows accessing the</span>
<span class="sd"> individual :class:`~apache_beam.pvalue.PCollection` s for each output</span>
<span class="sd"> using an ``object.tag`` syntax.</span>
<span class="sd"> Raises:</span>
<span class="sd"> ~exceptions.TypeError: if the **self** object is not a</span>
<span class="sd"> :class:`~apache_beam.pvalue.PCollection` that is the result of a</span>
<span class="sd"> :class:`ParDo` transform.</span>
<span class="sd"> ~exceptions.ValueError: if **main_kw** contains any key other than</span>
<span class="sd"> ``&#39;main&#39;``.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">main_tag</span> <span class="o">=</span> <span class="n">main_kw</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">&#39;main&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">if</span> <span class="n">main_kw</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Unexpected keyword arguments: </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span>
<span class="nb">list</span><span class="p">(</span><span class="n">main_kw</span><span class="p">))</span>
<span class="k">return</span> <span class="n">_MultiParDo</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tags</span><span class="p">,</span> <span class="n">main_tag</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">_pardo_fn_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">si_tags_and_types</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">windowing</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">,</span> <span class="n">si_tags_and_types</span><span class="p">,</span> <span class="n">windowing</span>
<div class="viewcode-block" id="ParDo.to_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo.to_runner_api_parameter">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ParDo</span><span class="p">),</span> \
<span class="s2">&quot;expected instance of ParDo, but got </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span>
<span class="n">picked_pardo_fn_data</span> <span class="o">=</span> <span class="n">pickler</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_pardo_fn_data</span><span class="p">())</span>
<span class="n">state_specs</span><span class="p">,</span> <span class="n">timer_specs</span> <span class="o">=</span> <span class="n">userstate</span><span class="o">.</span><span class="n">get_dofn_specs</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">)</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">primitives</span><span class="o">.</span><span class="n">PAR_DO</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ParDoPayload</span><span class="p">(</span>
<span class="n">do_fn</span><span class="o">=</span><span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">SdkFunctionSpec</span><span class="p">(</span>
<span class="n">environment_id</span><span class="o">=</span><span class="n">context</span><span class="o">.</span><span class="n">default_environment_id</span><span class="p">(),</span>
<span class="n">spec</span><span class="o">=</span><span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">FunctionSpec</span><span class="p">(</span>
<span class="n">urn</span><span class="o">=</span><span class="n">python_urns</span><span class="o">.</span><span class="n">PICKLED_DOFN_INFO</span><span class="p">,</span>
<span class="n">payload</span><span class="o">=</span><span class="n">picked_pardo_fn_data</span><span class="p">)),</span>
<span class="n">state_specs</span><span class="o">=</span><span class="p">{</span><span class="n">spec</span><span class="o">.</span><span class="n">name</span><span class="p">:</span> <span class="n">spec</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
<span class="k">for</span> <span class="n">spec</span> <span class="ow">in</span> <span class="n">state_specs</span><span class="p">},</span>
<span class="n">timer_specs</span><span class="o">=</span><span class="p">{</span><span class="n">spec</span><span class="o">.</span><span class="n">name</span><span class="p">:</span> <span class="n">spec</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
<span class="k">for</span> <span class="n">spec</span> <span class="ow">in</span> <span class="n">timer_specs</span><span class="p">},</span>
<span class="c1"># It&#39;d be nice to name these according to their actual</span>
<span class="c1"># names/positions in the orignal argument list, but such a</span>
<span class="c1"># transformation is currently irreversible given how</span>
<span class="c1"># remove_objects_from_args and insert_values_in_args</span>
<span class="c1"># are currently implemented.</span>
<span class="n">side_inputs</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;side</span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">ix</span><span class="p">:</span> <span class="n">si</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
<span class="k">for</span> <span class="n">ix</span><span class="p">,</span> <span class="n">si</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">)}))</span></div>
<div class="viewcode-block" id="ParDo.from_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo.from_runner_api_parameter">[docs]</a> <span class="nd">@PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">primitives</span><span class="o">.</span><span class="n">PAR_DO</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ParDoPayload</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">from_runner_api_parameter</span><span class="p">(</span><span class="n">pardo_payload</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">assert</span> <span class="n">pardo_payload</span><span class="o">.</span><span class="n">do_fn</span><span class="o">.</span><span class="n">spec</span><span class="o">.</span><span class="n">urn</span> <span class="o">==</span> <span class="n">python_urns</span><span class="o">.</span><span class="n">PICKLED_DOFN_INFO</span>
<span class="n">fn</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">,</span> <span class="n">si_tags_and_types</span><span class="p">,</span> <span class="n">windowing</span> <span class="o">=</span> <span class="n">pickler</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span>
<span class="n">pardo_payload</span><span class="o">.</span><span class="n">do_fn</span><span class="o">.</span><span class="n">spec</span><span class="o">.</span><span class="n">payload</span><span class="p">)</span>
<span class="k">if</span> <span class="n">si_tags_and_types</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="s1">&#39;explicit side input data&#39;</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">windowing</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="s1">&#39;explicit windowing&#39;</span><span class="p">)</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">ParDo</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="c1"># This is an ordered list stored as a dict (see the comments in</span>
<span class="c1"># to_runner_api_parameter above).</span>
<span class="n">indexed_side_inputs</span> <span class="o">=</span> <span class="p">[</span>
<span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">re</span><span class="o">.</span><span class="n">match</span><span class="p">(</span><span class="s1">&#39;side([0-9]+)(-.*)?$&#39;</span><span class="p">,</span> <span class="n">tag</span><span class="p">)</span><span class="o">.</span><span class="n">group</span><span class="p">(</span><span class="mi">1</span><span class="p">)),</span>
<span class="n">pvalue</span><span class="o">.</span><span class="n">AsSideInput</span><span class="o">.</span><span class="n">from_runner_api</span><span class="p">(</span><span class="n">si</span><span class="p">,</span> <span class="n">context</span><span class="p">))</span>
<span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">si</span> <span class="ow">in</span> <span class="n">pardo_payload</span><span class="o">.</span><span class="n">side_inputs</span><span class="o">.</span><span class="n">items</span><span class="p">()]</span>
<span class="n">result</span><span class="o">.</span><span class="n">side_inputs</span> <span class="o">=</span> <span class="p">[</span><span class="n">si</span> <span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">si</span> <span class="ow">in</span> <span class="nb">sorted</span><span class="p">(</span><span class="n">indexed_side_inputs</span><span class="p">)]</span>
<span class="k">return</span> <span class="n">result</span></div></div>
<span class="k">class</span> <span class="nc">_MultiParDo</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">do_transform</span><span class="p">,</span> <span class="n">tags</span><span class="p">,</span> <span class="n">main_tag</span><span class="p">):</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_MultiParDo</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">do_transform</span><span class="o">.</span><span class="n">label</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_do_transform</span> <span class="o">=</span> <span class="n">do_transform</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_tags</span> <span class="o">=</span> <span class="n">tags</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_main_tag</span> <span class="o">=</span> <span class="n">main_tag</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="n">_</span> <span class="o">=</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">_do_transform</span>
<span class="k">return</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">DoOutputsTuple</span><span class="p">(</span>
<span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_do_transform</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_tags</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_main_tag</span><span class="p">)</span>
<div class="viewcode-block" id="FlatMap"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.FlatMap">[docs]</a><span class="k">def</span> <span class="nf">FlatMap</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> <span class="c1"># pylint: disable=invalid-name</span>
<span class="sd">&quot;&quot;&quot;:func:`FlatMap` is like :class:`ParDo` except it takes a callable to</span>
<span class="sd"> specify the transformation.</span>
<span class="sd"> The callable must return an iterable for each element of the input</span>
<span class="sd"> :class:`~apache_beam.pvalue.PCollection`. The elements of these iterables will</span>
<span class="sd"> be flattened into the output :class:`~apache_beam.pvalue.PCollection`.</span>
<span class="sd"> Args:</span>
<span class="sd"> fn (callable): a callable object.</span>
<span class="sd"> *args: positional arguments passed to the transform callable.</span>
<span class="sd"> **kwargs: keyword arguments passed to the transform callable.</span>
<span class="sd"> Returns:</span>
<span class="sd"> ~apache_beam.pvalue.PCollection:</span>
<span class="sd"> A :class:`~apache_beam.pvalue.PCollection` containing the</span>
<span class="sd"> :func:`FlatMap` outputs.</span>
<span class="sd"> Raises:</span>
<span class="sd"> ~exceptions.TypeError: If the **fn** passed as argument is not a callable.</span>
<span class="sd"> Typical error is to pass a :class:`DoFn` instance which is supported only</span>
<span class="sd"> for :class:`ParDo`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">label</span> <span class="o">=</span> <span class="s1">&#39;FlatMap(</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">label_from_callable</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">callable</span><span class="p">(</span><span class="n">fn</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span>
<span class="s1">&#39;FlatMap can be used only with callable objects. &#39;</span>
<span class="s1">&#39;Received </span><span class="si">%r</span><span class="s1"> instead.&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">fn</span><span class="p">))</span>
<span class="n">pardo</span> <span class="o">=</span> <span class="n">ParDo</span><span class="p">(</span><span class="n">CallableWrapperDoFn</span><span class="p">(</span><span class="n">fn</span><span class="p">),</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="n">pardo</span><span class="o">.</span><span class="n">label</span> <span class="o">=</span> <span class="n">label</span>
<span class="k">return</span> <span class="n">pardo</span></div>
<div class="viewcode-block" id="Map"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Map">[docs]</a><span class="k">def</span> <span class="nf">Map</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> <span class="c1"># pylint: disable=invalid-name</span>
<span class="sd">&quot;&quot;&quot;:func:`Map` is like :func:`FlatMap` except its callable returns only a</span>
<span class="sd"> single element.</span>
<span class="sd"> Args:</span>
<span class="sd"> fn (callable): a callable object.</span>
<span class="sd"> *args: positional arguments passed to the transform callable.</span>
<span class="sd"> **kwargs: keyword arguments passed to the transform callable.</span>
<span class="sd"> Returns:</span>
<span class="sd"> ~apache_beam.pvalue.PCollection:</span>
<span class="sd"> A :class:`~apache_beam.pvalue.PCollection` containing the</span>
<span class="sd"> :func:`Map` outputs.</span>
<span class="sd"> Raises:</span>
<span class="sd"> ~exceptions.TypeError: If the **fn** passed as argument is not a callable.</span>
<span class="sd"> Typical error is to pass a :class:`DoFn` instance which is supported only</span>
<span class="sd"> for :class:`ParDo`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">callable</span><span class="p">(</span><span class="n">fn</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span>
<span class="s1">&#39;Map can be used only with callable objects. &#39;</span>
<span class="s1">&#39;Received </span><span class="si">%r</span><span class="s1"> instead.&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">fn</span><span class="p">))</span>
<span class="k">if</span> <span class="n">_fn_takes_side_inputs</span><span class="p">(</span><span class="n">fn</span><span class="p">):</span>
<span class="n">wrapper</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="p">[</span><span class="n">fn</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)]</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">wrapper</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">[</span><span class="n">fn</span><span class="p">(</span><span class="n">x</span><span class="p">)]</span>
<span class="n">label</span> <span class="o">=</span> <span class="s1">&#39;Map(</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">label_from_callable</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span>
<span class="c1"># TODO. What about callable classes?</span>
<span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="s1">&#39;__name__&#39;</span><span class="p">):</span>
<span class="n">wrapper</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">=</span> <span class="n">fn</span><span class="o">.</span><span class="vm">__name__</span>
<span class="c1"># Proxy the type-hint information from the original function to this new</span>
<span class="c1"># wrapped function.</span>
<span class="n">get_type_hints</span><span class="p">(</span><span class="n">wrapper</span><span class="p">)</span><span class="o">.</span><span class="n">input_types</span> <span class="o">=</span> <span class="n">get_type_hints</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span><span class="o">.</span><span class="n">input_types</span>
<span class="n">output_hint</span> <span class="o">=</span> <span class="n">get_type_hints</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span><span class="o">.</span><span class="n">simple_output_type</span><span class="p">(</span><span class="n">label</span><span class="p">)</span>
<span class="k">if</span> <span class="n">output_hint</span><span class="p">:</span>
<span class="n">get_type_hints</span><span class="p">(</span><span class="n">wrapper</span><span class="p">)</span><span class="o">.</span><span class="n">set_output_types</span><span class="p">(</span><span class="n">typehints</span><span class="o">.</span><span class="n">Iterable</span><span class="p">[</span><span class="n">output_hint</span><span class="p">])</span>
<span class="c1"># pylint: disable=protected-access</span>
<span class="n">wrapper</span><span class="o">.</span><span class="n">_argspec_fn</span> <span class="o">=</span> <span class="n">fn</span>
<span class="c1"># pylint: enable=protected-access</span>
<span class="n">pardo</span> <span class="o">=</span> <span class="n">FlatMap</span><span class="p">(</span><span class="n">wrapper</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="n">pardo</span><span class="o">.</span><span class="n">label</span> <span class="o">=</span> <span class="n">label</span>
<span class="k">return</span> <span class="n">pardo</span></div>
<div class="viewcode-block" id="Filter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Filter">[docs]</a><span class="k">def</span> <span class="nf">Filter</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> <span class="c1"># pylint: disable=invalid-name</span>
<span class="sd">&quot;&quot;&quot;:func:`Filter` is a :func:`FlatMap` with its callable filtering out</span>
<span class="sd"> elements.</span>
<span class="sd"> Args:</span>
<span class="sd"> fn (callable): a callable object.</span>
<span class="sd"> *args: positional arguments passed to the transform callable.</span>
<span class="sd"> **kwargs: keyword arguments passed to the transform callable.</span>
<span class="sd"> Returns:</span>
<span class="sd"> ~apache_beam.pvalue.PCollection:</span>
<span class="sd"> A :class:`~apache_beam.pvalue.PCollection` containing the</span>
<span class="sd"> :func:`Filter` outputs.</span>
<span class="sd"> Raises:</span>
<span class="sd"> ~exceptions.TypeError: If the **fn** passed as argument is not a callable.</span>
<span class="sd"> Typical error is to pass a :class:`DoFn` instance which is supported only</span>
<span class="sd"> for :class:`ParDo`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">callable</span><span class="p">(</span><span class="n">fn</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span>
<span class="s1">&#39;Filter can be used only with callable objects. &#39;</span>
<span class="s1">&#39;Received </span><span class="si">%r</span><span class="s1"> instead.&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">fn</span><span class="p">))</span>
<span class="n">wrapper</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="p">[</span><span class="n">x</span><span class="p">]</span> <span class="k">if</span> <span class="n">fn</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> <span class="k">else</span> <span class="p">[]</span>
<span class="n">label</span> <span class="o">=</span> <span class="s1">&#39;Filter(</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">label_from_callable</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span>
<span class="c1"># TODO: What about callable classes?</span>
<span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="s1">&#39;__name__&#39;</span><span class="p">):</span>
<span class="n">wrapper</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">=</span> <span class="n">fn</span><span class="o">.</span><span class="vm">__name__</span>
<span class="c1"># Proxy the type-hint information from the function being wrapped, setting the</span>
<span class="c1"># output type to be the same as the input type.</span>
<span class="n">get_type_hints</span><span class="p">(</span><span class="n">wrapper</span><span class="p">)</span><span class="o">.</span><span class="n">input_types</span> <span class="o">=</span> <span class="n">get_type_hints</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span><span class="o">.</span><span class="n">input_types</span>
<span class="n">output_hint</span> <span class="o">=</span> <span class="n">get_type_hints</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span><span class="o">.</span><span class="n">simple_output_type</span><span class="p">(</span><span class="n">label</span><span class="p">)</span>
<span class="k">if</span> <span class="p">(</span><span class="n">output_hint</span> <span class="ow">is</span> <span class="kc">None</span>
<span class="ow">and</span> <span class="n">get_type_hints</span><span class="p">(</span><span class="n">wrapper</span><span class="p">)</span><span class="o">.</span><span class="n">input_types</span>
<span class="ow">and</span> <span class="n">get_type_hints</span><span class="p">(</span><span class="n">wrapper</span><span class="p">)</span><span class="o">.</span><span class="n">input_types</span><span class="p">[</span><span class="mi">0</span><span class="p">]):</span>
<span class="n">output_hint</span> <span class="o">=</span> <span class="n">get_type_hints</span><span class="p">(</span><span class="n">wrapper</span><span class="p">)</span><span class="o">.</span><span class="n">input_types</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">if</span> <span class="n">output_hint</span><span class="p">:</span>
<span class="n">get_type_hints</span><span class="p">(</span><span class="n">wrapper</span><span class="p">)</span><span class="o">.</span><span class="n">set_output_types</span><span class="p">(</span><span class="n">typehints</span><span class="o">.</span><span class="n">Iterable</span><span class="p">[</span><span class="n">output_hint</span><span class="p">])</span>
<span class="c1"># pylint: disable=protected-access</span>
<span class="n">wrapper</span><span class="o">.</span><span class="n">_argspec_fn</span> <span class="o">=</span> <span class="n">fn</span>
<span class="c1"># pylint: enable=protected-access</span>
<span class="n">pardo</span> <span class="o">=</span> <span class="n">FlatMap</span><span class="p">(</span><span class="n">wrapper</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="n">pardo</span><span class="o">.</span><span class="n">label</span> <span class="o">=</span> <span class="n">label</span>
<span class="k">return</span> <span class="n">pardo</span></div>
<span class="k">def</span> <span class="nf">_combine_payload</span><span class="p">(</span><span class="n">combine_fn</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">return</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">CombinePayload</span><span class="p">(</span>
<span class="n">combine_fn</span><span class="o">=</span><span class="n">combine_fn</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">(</span><span class="n">context</span><span class="p">),</span>
<span class="n">accumulator_coder_id</span><span class="o">=</span><span class="n">context</span><span class="o">.</span><span class="n">coders</span><span class="o">.</span><span class="n">get_id</span><span class="p">(</span>
<span class="n">combine_fn</span><span class="o">.</span><span class="n">get_accumulator_coder</span><span class="p">()))</span>
<div class="viewcode-block" id="CombineGlobally"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineGlobally">[docs]</a><span class="k">class</span> <span class="nc">CombineGlobally</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A :class:`CombineGlobally` transform.</span>
<span class="sd"> Reduces a :class:`~apache_beam.pvalue.PCollection` to a single value by</span>
<span class="sd"> progressively applying a :class:`CombineFn` to portions of the</span>
<span class="sd"> :class:`~apache_beam.pvalue.PCollection` (and to intermediate values created</span>
<span class="sd"> thereby). See documentation in :class:`CombineFn` for details on the specifics</span>
<span class="sd"> on how :class:`CombineFn` s are applied.</span>
<span class="sd"> Args:</span>
<span class="sd"> pcoll (~apache_beam.pvalue.PCollection):</span>
<span class="sd"> a :class:`~apache_beam.pvalue.PCollection` to be reduced into a single</span>
<span class="sd"> value.</span>
<span class="sd"> fn (callable): a :class:`CombineFn` object that will be called to</span>
<span class="sd"> progressively reduce the :class:`~apache_beam.pvalue.PCollection` into</span>
<span class="sd"> single values, or a callable suitable for wrapping by</span>
<span class="sd"> :class:`~apache_beam.transforms.core.CallableWrapperCombineFn`.</span>
<span class="sd"> *args: positional arguments passed to the :class:`CombineFn` object.</span>
<span class="sd"> **kwargs: keyword arguments passed to the :class:`CombineFn` object.</span>
<span class="sd"> Raises:</span>
<span class="sd"> ~exceptions.TypeError: If the output type of the input</span>
<span class="sd"> :class:`~apache_beam.pvalue.PCollection` is not compatible</span>
<span class="sd"> with ``Iterable[A]``.</span>
<span class="sd"> Returns:</span>
<span class="sd"> ~apache_beam.pvalue.PCollection: A single-element</span>
<span class="sd"> :class:`~apache_beam.pvalue.PCollection` containing the main output of</span>
<span class="sd"> the :class:`CombineGlobally` transform.</span>
<span class="sd"> Note that the positional and keyword arguments will be processed in order</span>
<span class="sd"> to detect :class:`~apache_beam.pvalue.PValue` s that will be computed as side</span>
<span class="sd"> inputs to the transform.</span>
<span class="sd"> During pipeline execution whenever the :class:`CombineFn` object gets executed</span>
<span class="sd"> (i.e. any of the :class:`CombineFn` methods get called), the</span>
<span class="sd"> :class:`~apache_beam.pvalue.PValue` arguments will be replaced by their</span>
<span class="sd"> actual value in the exact position where they appear in the argument lists.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">has_defaults</span> <span class="o">=</span> <span class="kc">True</span>
<span class="n">as_view</span> <span class="o">=</span> <span class="kc">False</span>
<span class="n">fanout</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fn</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="n">CombineFn</span><span class="p">)</span> <span class="ow">or</span> <span class="n">callable</span><span class="p">(</span><span class="n">fn</span><span class="p">)):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span>
<span class="s1">&#39;CombineGlobally can be used only with combineFn objects. &#39;</span>
<span class="s1">&#39;Received </span><span class="si">%r</span><span class="s1"> instead.&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">fn</span><span class="p">))</span>
<span class="nb">super</span><span class="p">(</span><span class="n">CombineGlobally</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">fn</span> <span class="o">=</span> <span class="n">fn</span>
<span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="o">=</span> <span class="n">args</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span>
<div class="viewcode-block" id="CombineGlobally.display_data"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineGlobally.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="s1">&#39;combine_fn&#39;</span><span class="p">:</span>
<span class="n">DisplayDataItem</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s1">&#39;Combine Function&#39;</span><span class="p">),</span>
<span class="s1">&#39;combine_fn_dd&#39;</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">}</span></div>
<div class="viewcode-block" id="CombineGlobally.default_label"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineGlobally.default_label">[docs]</a> <span class="k">def</span> <span class="nf">default_label</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s1">&#39;CombineGlobally(</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">label_from_callable</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">_clone</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">extra_attributes</span><span class="p">):</span>
<span class="n">clone</span> <span class="o">=</span> <span class="n">copy</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="n">clone</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">extra_attributes</span><span class="p">)</span>
<span class="k">return</span> <span class="n">clone</span>
<div class="viewcode-block" id="CombineGlobally.with_fanout"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineGlobally.with_fanout">[docs]</a> <span class="k">def</span> <span class="nf">with_fanout</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fanout</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_clone</span><span class="p">(</span><span class="n">fanout</span><span class="o">=</span><span class="n">fanout</span><span class="p">)</span></div>
<div class="viewcode-block" id="CombineGlobally.with_defaults"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineGlobally.with_defaults">[docs]</a> <span class="k">def</span> <span class="nf">with_defaults</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">has_defaults</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_clone</span><span class="p">(</span><span class="n">has_defaults</span><span class="o">=</span><span class="n">has_defaults</span><span class="p">)</span></div>
<div class="viewcode-block" id="CombineGlobally.without_defaults"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineGlobally.without_defaults">[docs]</a> <span class="k">def</span> <span class="nf">without_defaults</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">with_defaults</span><span class="p">(</span><span class="kc">False</span><span class="p">)</span></div>
<div class="viewcode-block" id="CombineGlobally.as_singleton_view"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineGlobally.as_singleton_view">[docs]</a> <span class="k">def</span> <span class="nf">as_singleton_view</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_clone</span><span class="p">(</span><span class="n">as_view</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div>
<div class="viewcode-block" id="CombineGlobally.expand"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineGlobally.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">add_input_types</span><span class="p">(</span><span class="n">transform</span><span class="p">):</span>
<span class="n">type_hints</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_type_hints</span><span class="p">()</span>
<span class="k">if</span> <span class="n">type_hints</span><span class="o">.</span><span class="n">input_types</span><span class="p">:</span>
<span class="k">return</span> <span class="n">transform</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">type_hints</span><span class="o">.</span><span class="n">input_types</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">])</span>
<span class="k">return</span> <span class="n">transform</span>
<span class="n">combine_per_key</span> <span class="o">=</span> <span class="n">CombinePerKey</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">fanout</span><span class="p">:</span>
<span class="n">combine_per_key</span> <span class="o">=</span> <span class="n">combine_per_key</span><span class="o">.</span><span class="n">with_hot_key_fanout</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fanout</span><span class="p">)</span>
<span class="n">combined</span> <span class="o">=</span> <span class="p">(</span><span class="n">pcoll</span>
<span class="o">|</span> <span class="s1">&#39;KeyWithVoid&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">add_input_types</span><span class="p">(</span>
<span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">v</span><span class="p">:</span> <span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">v</span><span class="p">))</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span>
<span class="n">KV</span><span class="p">[</span><span class="kc">None</span><span class="p">,</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">]))</span>
<span class="o">|</span> <span class="s1">&#39;CombinePerKey&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">combine_per_key</span>
<span class="o">|</span> <span class="s1">&#39;UnKey&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">k_v</span><span class="p">:</span> <span class="n">k_v</span><span class="p">[</span><span class="mi">1</span><span class="p">]))</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_defaults</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">as_view</span><span class="p">:</span>
<span class="k">return</span> <span class="n">combined</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_defaults</span><span class="p">:</span>
<span class="n">combine_fn</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">fn</span> <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">,</span> <span class="n">CombineFn</span><span class="p">)</span>
<span class="k">else</span> <span class="n">CombineFn</span><span class="o">.</span><span class="n">from_callable</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">))</span>
<span class="n">default_value</span> <span class="o">=</span> <span class="n">combine_fn</span><span class="o">.</span><span class="n">apply</span><span class="p">([],</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">default_value</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="o">.</span><span class="n">_NO_DEFAULT</span> <span class="c1"># pylint: disable=protected-access</span>
<span class="n">view</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span><span class="n">combined</span><span class="p">,</span> <span class="n">default_value</span><span class="o">=</span><span class="n">default_value</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">as_view</span><span class="p">:</span>
<span class="k">return</span> <span class="n">view</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">if</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">windowing</span><span class="o">.</span><span class="n">windowfn</span> <span class="o">!=</span> <span class="n">GlobalWindows</span><span class="p">():</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;Default values are not yet supported in CombineGlobally() if the &quot;</span>
<span class="s2">&quot;output PCollection is not windowed by GlobalWindows. &quot;</span>
<span class="s2">&quot;Instead, use CombineGlobally().without_defaults() to output &quot;</span>
<span class="s2">&quot;an empty PCollection if the input PCollection is empty, &quot;</span>
<span class="s2">&quot;or CombineGlobally().as_singleton_view() to get the default &quot;</span>
<span class="s2">&quot;output of the CombineFn if the input PCollection is empty.&quot;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">typed</span><span class="p">(</span><span class="n">transform</span><span class="p">):</span>
<span class="c1"># TODO(robertwb): We should infer this.</span>
<span class="k">if</span> <span class="n">combined</span><span class="o">.</span><span class="n">element_type</span><span class="p">:</span>
<span class="k">return</span> <span class="n">transform</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">combined</span><span class="o">.</span><span class="n">element_type</span><span class="p">)</span>
<span class="k">return</span> <span class="n">transform</span>
<span class="k">return</span> <span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span>
<span class="o">|</span> <span class="s1">&#39;DoOnce&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span>
<span class="o">|</span> <span class="s1">&#39;InjectDefault&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">typed</span><span class="p">(</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">_</span><span class="p">,</span> <span class="n">s</span><span class="p">:</span> <span class="n">s</span><span class="p">,</span> <span class="n">view</span><span class="p">)))</span></div></div>
<div class="viewcode-block" id="CombinePerKey"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombinePerKey">[docs]</a><span class="k">class</span> <span class="nc">CombinePerKey</span><span class="p">(</span><span class="n">PTransformWithSideInputs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A per-key Combine transform.</span>
<span class="sd"> Identifies sets of values associated with the same key in the input</span>
<span class="sd"> PCollection, then applies a CombineFn to condense those sets to single</span>
<span class="sd"> values. See documentation in CombineFn for details on the specifics on how</span>
<span class="sd"> CombineFns are applied.</span>
<span class="sd"> Args:</span>
<span class="sd"> pcoll: input pcollection.</span>
<span class="sd"> fn: instance of CombineFn to apply to all values under the same key in</span>
<span class="sd"> pcoll, or a callable whose signature is ``f(iterable, *args, **kwargs)``</span>
<span class="sd"> (e.g., sum, max).</span>
<span class="sd"> *args: arguments and side inputs, passed directly to the CombineFn.</span>
<span class="sd"> **kwargs: arguments and side inputs, passed directly to the CombineFn.</span>
<span class="sd"> Returns:</span>
<span class="sd"> A PObject holding the result of the combine operation.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="CombinePerKey.with_hot_key_fanout"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombinePerKey.with_hot_key_fanout">[docs]</a> <span class="k">def</span> <span class="nf">with_hot_key_fanout</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fanout</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A per-key combine operation like self but with two levels of aggregation.</span>
<span class="sd"> If a given key is produced by too many upstream bundles, the final</span>
<span class="sd"> reduction can become a bottleneck despite partial combining being lifted</span>
<span class="sd"> pre-GroupByKey. In these cases it can be helpful to perform intermediate</span>
<span class="sd"> partial aggregations in parallel and then re-group to peform a final</span>
<span class="sd"> (per-key) combine. This is also useful for high-volume keys in streaming</span>
<span class="sd"> where combiners are not generally lifted for latency reasons.</span>
<span class="sd"> Note that a fanout greater than 1 requires the data to be sent through</span>
<span class="sd"> two GroupByKeys, and a high fanout can also result in more shuffle data</span>
<span class="sd"> due to less per-bundle combining. Setting the fanout for a key at 1 or less</span>
<span class="sd"> places values on the &quot;cold key&quot; path that skip the intermediate level of</span>
<span class="sd"> aggregation.</span>
<span class="sd"> Args:</span>
<span class="sd"> fanout: either an int, for a constant-degree fanout, or a callable</span>
<span class="sd"> mapping keys to a key-specific degree of fanout</span>
<span class="sd"> Returns:</span>
<span class="sd"> A per-key combining PTransform with the specified fanout.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.combiners</span> <span class="k">import</span> <span class="n">curry_combine_fn</span>
<span class="k">return</span> <span class="n">_CombinePerKeyWithHotKeyFanout</span><span class="p">(</span>
<span class="n">curry_combine_fn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">),</span>
<span class="n">fanout</span><span class="p">)</span></div>
<div class="viewcode-block" id="CombinePerKey.display_data"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombinePerKey.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="s1">&#39;combine_fn&#39;</span><span class="p">:</span>
<span class="n">DisplayDataItem</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="o">.</span><span class="vm">__class__</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="s1">&#39;Combine Function&#39;</span><span class="p">),</span>
<span class="s1">&#39;combine_fn_dd&#39;</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">}</span></div>
<div class="viewcode-block" id="CombinePerKey.make_fn"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombinePerKey.make_fn">[docs]</a> <span class="k">def</span> <span class="nf">make_fn</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fn</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_fn_label</span> <span class="o">=</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">label_from_callable</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span>
<span class="k">return</span> <span class="n">fn</span> <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="n">CombineFn</span><span class="p">)</span> <span class="k">else</span> <span class="n">CombineFn</span><span class="o">.</span><span class="n">from_callable</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span></div>
<div class="viewcode-block" id="CombinePerKey.default_label"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombinePerKey.default_label">[docs]</a> <span class="k">def</span> <span class="nf">default_label</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s1">&#39;</span><span class="si">%s</span><span class="s1">(</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fn_label</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">_process_argspec_fn</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">element</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="kc">None</span>
<div class="viewcode-block" id="CombinePerKey.expand"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombinePerKey.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span> <span class="o">=</span> <span class="n">util</span><span class="o">.</span><span class="n">insert_values_in_args</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">)</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">GroupByKey</span><span class="p">()</span> <span class="o">|</span> <span class="s1">&#39;Combine&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">CombineValues</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span></div>
<div class="viewcode-block" id="CombinePerKey.default_type_hints"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombinePerKey.default_type_hints">[docs]</a> <span class="k">def</span> <span class="nf">default_type_hints</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">hints</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="o">.</span><span class="n">get_type_hints</span><span class="p">()</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="k">if</span> <span class="n">hints</span><span class="o">.</span><span class="n">input_types</span><span class="p">:</span>
<span class="n">K</span> <span class="o">=</span> <span class="n">typehints</span><span class="o">.</span><span class="n">TypeVariable</span><span class="p">(</span><span class="s1">&#39;K&#39;</span><span class="p">)</span>
<span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span> <span class="o">=</span> <span class="n">hints</span><span class="o">.</span><span class="n">input_types</span>
<span class="n">args</span> <span class="o">=</span> <span class="p">(</span><span class="n">typehints</span><span class="o">.</span><span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">]],)</span> <span class="o">+</span> <span class="n">args</span><span class="p">[</span><span class="mi">1</span><span class="p">:]</span>
<span class="n">hints</span><span class="o">.</span><span class="n">set_input_types</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">K</span> <span class="o">=</span> <span class="n">typehints</span><span class="o">.</span><span class="n">Any</span>
<span class="k">if</span> <span class="n">hints</span><span class="o">.</span><span class="n">output_types</span><span class="p">:</span>
<span class="n">main_output_type</span> <span class="o">=</span> <span class="n">hints</span><span class="o">.</span><span class="n">simple_output_type</span><span class="p">(</span><span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="n">hints</span><span class="o">.</span><span class="n">set_output_types</span><span class="p">(</span><span class="n">typehints</span><span class="o">.</span><span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">main_output_type</span><span class="p">])</span>
<span class="k">return</span> <span class="n">hints</span></div>
<div class="viewcode-block" id="CombinePerKey.to_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombinePerKey.to_runner_api_parameter">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.combiners</span> <span class="k">import</span> <span class="n">curry_combine_fn</span>
<span class="n">combine_fn</span> <span class="o">=</span> <span class="n">curry_combine_fn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">combine_fn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">fn</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">COMBINE_PER_KEY</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="n">_combine_payload</span><span class="p">(</span><span class="n">combine_fn</span><span class="p">,</span> <span class="n">context</span><span class="p">))</span></div>
<div class="viewcode-block" id="CombinePerKey.from_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombinePerKey.from_runner_api_parameter">[docs]</a> <span class="nd">@PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">composites</span><span class="o">.</span><span class="n">COMBINE_PER_KEY</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">CombinePayload</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">from_runner_api_parameter</span><span class="p">(</span><span class="n">combine_payload</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">return</span> <span class="n">CombinePerKey</span><span class="p">(</span>
<span class="n">CombineFn</span><span class="o">.</span><span class="n">from_runner_api</span><span class="p">(</span><span class="n">combine_payload</span><span class="o">.</span><span class="n">combine_fn</span><span class="p">,</span> <span class="n">context</span><span class="p">))</span></div></div>
<span class="c1"># TODO(robertwb): Rename to CombineGroupedValues?</span>
<div class="viewcode-block" id="CombineValues"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineValues">[docs]</a><span class="k">class</span> <span class="nc">CombineValues</span><span class="p">(</span><span class="n">PTransformWithSideInputs</span><span class="p">):</span>
<div class="viewcode-block" id="CombineValues.make_fn"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineValues.make_fn">[docs]</a> <span class="k">def</span> <span class="nf">make_fn</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fn</span><span class="p">):</span>
<span class="k">return</span> <span class="n">fn</span> <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="n">CombineFn</span><span class="p">)</span> <span class="k">else</span> <span class="n">CombineFn</span><span class="o">.</span><span class="n">from_callable</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span></div>
<div class="viewcode-block" id="CombineValues.expand"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineValues.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span> <span class="o">=</span> <span class="n">util</span><span class="o">.</span><span class="n">insert_values_in_args</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">)</span>
<span class="n">input_type</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span>
<span class="n">key_type</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">input_type</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">key_type</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">input_type</span><span class="o">.</span><span class="n">tuple_types</span>
<span class="n">runtime_type_check</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">_options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">TypeOptions</span><span class="p">)</span><span class="o">.</span><span class="n">runtime_type_check</span><span class="p">)</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">ParDo</span><span class="p">(</span>
<span class="n">CombineValuesDoFn</span><span class="p">(</span><span class="n">key_type</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">,</span> <span class="n">runtime_type_check</span><span class="p">),</span>
<span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span></div>
<div class="viewcode-block" id="CombineValues.to_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineValues.to_runner_api_parameter">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.combiners</span> <span class="k">import</span> <span class="n">curry_combine_fn</span>
<span class="n">combine_fn</span> <span class="o">=</span> <span class="n">curry_combine_fn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">combine_fn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">fn</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">combine_components</span><span class="o">.</span><span class="n">COMBINE_GROUPED_VALUES</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="n">_combine_payload</span><span class="p">(</span><span class="n">combine_fn</span><span class="p">,</span> <span class="n">context</span><span class="p">))</span></div>
<div class="viewcode-block" id="CombineValues.from_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.CombineValues.from_runner_api_parameter">[docs]</a> <span class="nd">@PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">combine_components</span><span class="o">.</span><span class="n">COMBINE_GROUPED_VALUES</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">CombinePayload</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">from_runner_api_parameter</span><span class="p">(</span><span class="n">combine_payload</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">return</span> <span class="n">CombineValues</span><span class="p">(</span>
<span class="n">CombineFn</span><span class="o">.</span><span class="n">from_runner_api</span><span class="p">(</span><span class="n">combine_payload</span><span class="o">.</span><span class="n">combine_fn</span><span class="p">,</span> <span class="n">context</span><span class="p">))</span></div></div>
<span class="k">class</span> <span class="nc">CombineValuesDoFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;DoFn for performing per-key Combine transforms.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_pcoll_type</span><span class="p">,</span> <span class="n">combinefn</span><span class="p">,</span> <span class="n">runtime_type_check</span><span class="p">):</span>
<span class="nb">super</span><span class="p">(</span><span class="n">CombineValuesDoFn</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">combinefn</span> <span class="o">=</span> <span class="n">combinefn</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runtime_type_check</span> <span class="o">=</span> <span class="n">runtime_type_check</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="c1"># Expected elements input to this DoFn are 2-tuples of the form</span>
<span class="c1"># (key, iter), with iter an iterable of all the values associated with key</span>
<span class="c1"># in the input PCollection.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">runtime_type_check</span><span class="p">:</span>
<span class="c1"># Apply the combiner in a single operation rather than artificially</span>
<span class="c1"># breaking it up so that output type violations manifest as TypeCheck</span>
<span class="c1"># errors rather than type errors.</span>
<span class="k">return</span> <span class="p">[</span>
<span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span>
<span class="bp">self</span><span class="o">.</span><span class="n">combinefn</span><span class="o">.</span><span class="n">apply</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">],</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">))]</span>
<span class="c1"># Add the elements into three accumulators (for testing of merge).</span>
<span class="n">elements</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span>
<span class="n">accumulators</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">k</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">3</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">elements</span><span class="p">)</span> <span class="o">&lt;=</span> <span class="n">k</span><span class="p">:</span>
<span class="k">break</span>
<span class="n">accumulators</span><span class="o">.</span><span class="n">append</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">combinefn</span><span class="o">.</span><span class="n">add_inputs</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">combinefn</span><span class="o">.</span><span class="n">create_accumulator</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">),</span>
<span class="n">elements</span><span class="p">[</span><span class="n">k</span><span class="p">::</span><span class="mi">3</span><span class="p">],</span>
<span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">))</span>
<span class="c1"># Merge the accumulators.</span>
<span class="n">accumulator</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">combinefn</span><span class="o">.</span><span class="n">merge_accumulators</span><span class="p">(</span>
<span class="n">accumulators</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="c1"># Convert accumulator to the final result.</span>
<span class="k">return</span> <span class="p">[(</span><span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span>
<span class="bp">self</span><span class="o">.</span><span class="n">combinefn</span><span class="o">.</span><span class="n">extract_output</span><span class="p">(</span><span class="n">accumulator</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">))]</span>
<span class="k">def</span> <span class="nf">default_type_hints</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">hints</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">combinefn</span><span class="o">.</span><span class="n">get_type_hints</span><span class="p">()</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="k">if</span> <span class="n">hints</span><span class="o">.</span><span class="n">input_types</span><span class="p">:</span>
<span class="n">K</span> <span class="o">=</span> <span class="n">typehints</span><span class="o">.</span><span class="n">TypeVariable</span><span class="p">(</span><span class="s1">&#39;K&#39;</span><span class="p">)</span>
<span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span> <span class="o">=</span> <span class="n">hints</span><span class="o">.</span><span class="n">input_types</span>
<span class="n">args</span> <span class="o">=</span> <span class="p">(</span><span class="n">typehints</span><span class="o">.</span><span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">typehints</span><span class="o">.</span><span class="n">Iterable</span><span class="p">[</span><span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">]]],)</span> <span class="o">+</span> <span class="n">args</span><span class="p">[</span><span class="mi">1</span><span class="p">:]</span>
<span class="n">hints</span><span class="o">.</span><span class="n">set_input_types</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">K</span> <span class="o">=</span> <span class="n">typehints</span><span class="o">.</span><span class="n">Any</span>
<span class="k">if</span> <span class="n">hints</span><span class="o">.</span><span class="n">output_types</span><span class="p">:</span>
<span class="n">main_output_type</span> <span class="o">=</span> <span class="n">hints</span><span class="o">.</span><span class="n">simple_output_type</span><span class="p">(</span><span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="n">hints</span><span class="o">.</span><span class="n">set_output_types</span><span class="p">(</span><span class="n">typehints</span><span class="o">.</span><span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">main_output_type</span><span class="p">])</span>
<span class="k">return</span> <span class="n">hints</span>
<span class="k">class</span> <span class="nc">_CombinePerKeyWithHotKeyFanout</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">combine_fn</span><span class="p">,</span> <span class="n">fanout</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_fanout_fn</span> <span class="o">=</span> <span class="p">(</span>
<span class="p">(</span><span class="k">lambda</span> <span class="n">key</span><span class="p">:</span> <span class="n">fanout</span><span class="p">)</span> <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fanout</span><span class="p">,</span> <span class="nb">int</span><span class="p">)</span> <span class="k">else</span> <span class="n">fanout</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_combine_fn</span> <span class="o">=</span> <span class="n">combine_fn</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="k">import</span> <span class="n">AccumulationMode</span>
<span class="n">combine_fn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_combine_fn</span>
<span class="n">fanout_fn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fanout_fn</span>
<span class="k">class</span> <span class="nc">SplitHotCold</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># Spreading a hot key across all possible sub-keys for all bundles</span>
<span class="c1"># would defeat the goal of not overwhelming downstream reducers</span>
<span class="c1"># (as well as making less efficient use of PGBK combining tables).</span>
<span class="c1"># Instead, each bundle independently makes a consistent choice about</span>
<span class="c1"># which &quot;shard&quot; of a key to send its intermediate results.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_nonce</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">random</span><span class="o">.</span><span class="n">getrandbits</span><span class="p">(</span><span class="mi">31</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">key</span><span class="p">,</span> <span class="n">value</span> <span class="o">=</span> <span class="n">element</span>
<span class="n">fanout</span> <span class="o">=</span> <span class="n">fanout_fn</span><span class="p">(</span><span class="n">key</span><span class="p">)</span>
<span class="k">if</span> <span class="n">fanout</span> <span class="o">&lt;=</span> <span class="mi">1</span><span class="p">:</span>
<span class="c1"># Boolean indicates this is not an accumulator.</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="p">(</span><span class="kc">False</span><span class="p">,</span> <span class="n">value</span><span class="p">))</span> <span class="c1"># cold</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span><span class="s1">&#39;hot&#39;</span><span class="p">,</span> <span class="p">((</span><span class="bp">self</span><span class="o">.</span><span class="n">_nonce</span> <span class="o">%</span> <span class="n">fanout</span><span class="p">,</span> <span class="n">key</span><span class="p">),</span> <span class="n">value</span><span class="p">))</span>
<span class="k">class</span> <span class="nc">PreCombineFn</span><span class="p">(</span><span class="n">CombineFn</span><span class="p">):</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">extract_output</span><span class="p">(</span><span class="n">accumulator</span><span class="p">):</span>
<span class="c1"># Boolean indicates this is an accumulator.</span>
<span class="k">return</span> <span class="p">(</span><span class="kc">True</span><span class="p">,</span> <span class="n">accumulator</span><span class="p">)</span>
<span class="n">create_accumulator</span> <span class="o">=</span> <span class="n">combine_fn</span><span class="o">.</span><span class="n">create_accumulator</span>
<span class="n">add_input</span> <span class="o">=</span> <span class="n">combine_fn</span><span class="o">.</span><span class="n">add_input</span>
<span class="n">merge_accumulators</span> <span class="o">=</span> <span class="n">combine_fn</span><span class="o">.</span><span class="n">merge_accumulators</span>
<span class="k">class</span> <span class="nc">PostCombineFn</span><span class="p">(</span><span class="n">CombineFn</span><span class="p">):</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">add_input</span><span class="p">(</span><span class="n">accumulator</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">is_accumulator</span><span class="p">,</span> <span class="n">value</span> <span class="o">=</span> <span class="n">element</span>
<span class="k">if</span> <span class="n">is_accumulator</span><span class="p">:</span>
<span class="k">return</span> <span class="n">combine_fn</span><span class="o">.</span><span class="n">merge_accumulators</span><span class="p">([</span><span class="n">accumulator</span><span class="p">,</span> <span class="n">value</span><span class="p">])</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">combine_fn</span><span class="o">.</span><span class="n">add_input</span><span class="p">(</span><span class="n">accumulator</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span>
<span class="n">create_accumulator</span> <span class="o">=</span> <span class="n">combine_fn</span><span class="o">.</span><span class="n">create_accumulator</span>
<span class="n">merge_accumulators</span> <span class="o">=</span> <span class="n">combine_fn</span><span class="o">.</span><span class="n">merge_accumulators</span>
<span class="n">extract_output</span> <span class="o">=</span> <span class="n">combine_fn</span><span class="o">.</span><span class="n">extract_output</span>
<span class="k">def</span> <span class="nf">StripNonce</span><span class="p">(</span><span class="n">nonce_key_value</span><span class="p">):</span>
<span class="p">(</span><span class="n">_</span><span class="p">,</span> <span class="n">key</span><span class="p">),</span> <span class="n">value</span> <span class="o">=</span> <span class="n">nonce_key_value</span>
<span class="k">return</span> <span class="n">key</span><span class="p">,</span> <span class="n">value</span>
<span class="n">cold</span><span class="p">,</span> <span class="n">hot</span> <span class="o">=</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">ParDo</span><span class="p">(</span><span class="n">SplitHotCold</span><span class="p">())</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span><span class="s1">&#39;hot&#39;</span><span class="p">,</span> <span class="n">main</span><span class="o">=</span><span class="s1">&#39;cold&#39;</span><span class="p">)</span>
<span class="n">cold</span><span class="o">.</span><span class="n">element_type</span> <span class="o">=</span> <span class="n">typehints</span><span class="o">.</span><span class="n">Any</span> <span class="c1"># No multi-output type hints.</span>
<span class="n">precombined_hot</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">hot</span>
<span class="c1"># Avoid double counting that may happen with stacked accumulating mode.</span>
<span class="o">|</span> <span class="s1">&#39;WindowIntoDiscarding&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">WindowInto</span><span class="p">(</span>
<span class="n">pcoll</span><span class="o">.</span><span class="n">windowing</span><span class="p">,</span> <span class="n">accumulation_mode</span><span class="o">=</span><span class="n">AccumulationMode</span><span class="o">.</span><span class="n">DISCARDING</span><span class="p">)</span>
<span class="o">|</span> <span class="n">CombinePerKey</span><span class="p">(</span><span class="n">PreCombineFn</span><span class="p">())</span>
<span class="o">|</span> <span class="n">Map</span><span class="p">(</span><span class="n">StripNonce</span><span class="p">)</span>
<span class="o">|</span> <span class="s1">&#39;WindowIntoOriginal&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">WindowInto</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">windowing</span><span class="p">))</span>
<span class="k">return</span> <span class="p">(</span>
<span class="p">(</span><span class="n">cold</span><span class="p">,</span> <span class="n">precombined_hot</span><span class="p">)</span>
<span class="o">|</span> <span class="n">Flatten</span><span class="p">()</span>
<span class="o">|</span> <span class="n">CombinePerKey</span><span class="p">(</span><span class="n">PostCombineFn</span><span class="p">()))</span>
<div class="viewcode-block" id="GroupByKey"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.GroupByKey">[docs]</a><span class="nd">@typehints</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">typehints</span><span class="o">.</span><span class="n">KV</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">V</span><span class="p">])</span>
<span class="nd">@typehints</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">typehints</span><span class="o">.</span><span class="n">KV</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">typehints</span><span class="o">.</span><span class="n">Iterable</span><span class="p">[</span><span class="n">V</span><span class="p">]])</span>
<span class="k">class</span> <span class="nc">GroupByKey</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A group by key transform.</span>
<span class="sd"> Processes an input PCollection consisting of key/value pairs represented as a</span>
<span class="sd"> tuple pair. The result is a PCollection where values having a common key are</span>
<span class="sd"> grouped together. For example (a, 1), (b, 2), (a, 3) will result into</span>
<span class="sd"> (a, [1, 3]), (b, [2]).</span>
<span class="sd"> The implementation here is used only when run on the local direct runner.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="GroupByKey.ReifyWindows"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.GroupByKey.ReifyWindows">[docs]</a> <span class="k">class</span> <span class="nc">ReifyWindows</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<div class="viewcode-block" id="GroupByKey.ReifyWindows.process"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.GroupByKey.ReifyWindows.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">window</span><span class="o">=</span><span class="n">DoFn</span><span class="o">.</span><span class="n">WindowParam</span><span class="p">,</span>
<span class="n">timestamp</span><span class="o">=</span><span class="n">DoFn</span><span class="o">.</span><span class="n">TimestampParam</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="o">=</span> <span class="n">element</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">TypeCheckError</span><span class="p">(</span><span class="s1">&#39;Input to GroupByKey must be a PCollection with &#39;</span>
<span class="s1">&#39;elements compatible with KV[A, B]&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="p">[(</span><span class="n">k</span><span class="p">,</span> <span class="n">WindowedValue</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">,</span> <span class="p">[</span><span class="n">window</span><span class="p">]))]</span></div>
<div class="viewcode-block" id="GroupByKey.ReifyWindows.infer_output_type"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.GroupByKey.ReifyWindows.infer_output_type">[docs]</a> <span class="k">def</span> <span class="nf">infer_output_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="p">):</span>
<span class="n">key_type</span><span class="p">,</span> <span class="n">value_type</span> <span class="o">=</span> <span class="n">trivial_inference</span><span class="o">.</span><span class="n">key_value_types</span><span class="p">(</span><span class="n">input_type</span><span class="p">)</span>
<span class="k">return</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">KV</span><span class="p">[</span><span class="n">key_type</span><span class="p">,</span> <span class="n">typehints</span><span class="o">.</span><span class="n">WindowedValue</span><span class="p">[</span><span class="n">value_type</span><span class="p">]]]</span></div></div>
<div class="viewcode-block" id="GroupByKey.expand"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.GroupByKey.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="c1"># This code path is only used in the local direct runner. For Dataflow</span>
<span class="c1"># runner execution, the GroupByKey transform is expanded on the service.</span>
<span class="n">input_type</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span>
<span class="k">if</span> <span class="n">input_type</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># Initialize type-hints used below to enforce type-checking and to pass</span>
<span class="c1"># downstream to further PTransforms.</span>
<span class="n">key_type</span><span class="p">,</span> <span class="n">value_type</span> <span class="o">=</span> <span class="n">trivial_inference</span><span class="o">.</span><span class="n">key_value_types</span><span class="p">(</span><span class="n">input_type</span><span class="p">)</span>
<span class="c1"># Enforce the input to a GBK has a KV element type.</span>
<span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span> <span class="o">=</span> <span class="n">KV</span><span class="p">[</span><span class="n">key_type</span><span class="p">,</span> <span class="n">value_type</span><span class="p">]</span>
<span class="n">typecoders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">verify_deterministic</span><span class="p">(</span>
<span class="n">typecoders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="n">key_type</span><span class="p">),</span>
<span class="s1">&#39;GroupByKey operation &quot;</span><span class="si">%s</span><span class="s1">&quot;&#39;</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">label</span><span class="p">)</span>
<span class="n">reify_output_type</span> <span class="o">=</span> <span class="n">KV</span><span class="p">[</span><span class="n">key_type</span><span class="p">,</span> <span class="n">typehints</span><span class="o">.</span><span class="n">WindowedValue</span><span class="p">[</span><span class="n">value_type</span><span class="p">]]</span>
<span class="n">gbk_input_type</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">KV</span><span class="p">[</span><span class="n">key_type</span><span class="p">,</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">typehints</span><span class="o">.</span><span class="n">WindowedValue</span><span class="p">[</span><span class="n">value_type</span><span class="p">]]])</span>
<span class="n">gbk_output_type</span> <span class="o">=</span> <span class="n">KV</span><span class="p">[</span><span class="n">key_type</span><span class="p">,</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">value_type</span><span class="p">]]</span>
<span class="c1"># pylint: disable=bad-continuation</span>
<span class="k">return</span> <span class="p">(</span><span class="n">pcoll</span>
<span class="o">|</span> <span class="s1">&#39;ReifyWindows&#39;</span> <span class="o">&gt;&gt;</span> <span class="p">(</span><span class="n">ParDo</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">ReifyWindows</span><span class="p">())</span>
<span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">reify_output_type</span><span class="p">))</span>
<span class="o">|</span> <span class="s1">&#39;GroupByKey&#39;</span> <span class="o">&gt;&gt;</span> <span class="p">(</span><span class="n">_GroupByKeyOnly</span><span class="p">()</span>
<span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">reify_output_type</span><span class="p">)</span>
<span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">gbk_input_type</span><span class="p">))</span>
<span class="o">|</span> <span class="p">(</span><span class="s1">&#39;GroupByWindow&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">_GroupAlsoByWindow</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">windowing</span><span class="p">)</span>
<span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">gbk_input_type</span><span class="p">)</span>
<span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">gbk_output_type</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># The input_type is None, run the default</span>
<span class="k">return</span> <span class="p">(</span><span class="n">pcoll</span>
<span class="o">|</span> <span class="s1">&#39;ReifyWindows&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ParDo</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">ReifyWindows</span><span class="p">())</span>
<span class="o">|</span> <span class="s1">&#39;GroupByKey&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">_GroupByKeyOnly</span><span class="p">()</span>
<span class="o">|</span> <span class="s1">&#39;GroupByWindow&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">_GroupAlsoByWindow</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">windowing</span><span class="p">))</span></div>
<div class="viewcode-block" id="GroupByKey.to_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.GroupByKey.to_runner_api_parameter">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_context</span><span class="p">):</span>
<span class="k">return</span> <span class="n">common_urns</span><span class="o">.</span><span class="n">primitives</span><span class="o">.</span><span class="n">GROUP_BY_KEY</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> <span class="kc">None</span></div>
<div class="viewcode-block" id="GroupByKey.from_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.GroupByKey.from_runner_api_parameter">[docs]</a> <span class="nd">@PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span><span class="n">common_urns</span><span class="o">.</span><span class="n">primitives</span><span class="o">.</span><span class="n">GROUP_BY_KEY</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">from_runner_api_parameter</span><span class="p">(</span><span class="n">unused_payload</span><span class="p">,</span> <span class="n">unused_context</span><span class="p">):</span>
<span class="k">return</span> <span class="n">GroupByKey</span><span class="p">()</span></div></div>
<span class="nd">@typehints</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">typehints</span><span class="o">.</span><span class="n">KV</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">V</span><span class="p">])</span>
<span class="nd">@typehints</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">typehints</span><span class="o">.</span><span class="n">KV</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">typehints</span><span class="o">.</span><span class="n">Iterable</span><span class="p">[</span><span class="n">V</span><span class="p">]])</span>
<span class="k">class</span> <span class="nc">_GroupByKeyOnly</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A group by key transform, ignoring windows.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">infer_output_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="p">):</span>
<span class="n">key_type</span><span class="p">,</span> <span class="n">value_type</span> <span class="o">=</span> <span class="n">trivial_inference</span><span class="o">.</span><span class="n">key_value_types</span><span class="p">(</span><span class="n">input_type</span><span class="p">)</span>
<span class="k">return</span> <span class="n">KV</span><span class="p">[</span><span class="n">key_type</span><span class="p">,</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">value_type</span><span class="p">]]</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_check_pcollection</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)</span>
<span class="k">return</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span>
<span class="nd">@typehints</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">typehints</span><span class="o">.</span><span class="n">KV</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">typehints</span><span class="o">.</span><span class="n">Iterable</span><span class="p">[</span><span class="n">V</span><span class="p">]])</span>
<span class="nd">@typehints</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">typehints</span><span class="o">.</span><span class="n">KV</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">typehints</span><span class="o">.</span><span class="n">Iterable</span><span class="p">[</span><span class="n">V</span><span class="p">]])</span>
<span class="k">class</span> <span class="nc">_GroupAlsoByWindow</span><span class="p">(</span><span class="n">ParDo</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;The GroupAlsoByWindow transform.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">windowing</span><span class="p">):</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_GroupAlsoByWindow</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">_GroupAlsoByWindowDoFn</span><span class="p">(</span><span class="n">windowing</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">windowing</span> <span class="o">=</span> <span class="n">windowing</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_check_pcollection</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)</span>
<span class="k">return</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_GroupAlsoByWindowDoFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="c1"># TODO(robertwb): Support combiner lifting.</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">windowing</span><span class="p">):</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_GroupAlsoByWindowDoFn</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">windowing</span> <span class="o">=</span> <span class="n">windowing</span>
<span class="k">def</span> <span class="nf">infer_output_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="p">):</span>
<span class="n">key_type</span><span class="p">,</span> <span class="n">windowed_value_iter_type</span> <span class="o">=</span> <span class="n">trivial_inference</span><span class="o">.</span><span class="n">key_value_types</span><span class="p">(</span>
<span class="n">input_type</span><span class="p">)</span>
<span class="n">value_type</span> <span class="o">=</span> <span class="n">windowed_value_iter_type</span><span class="o">.</span><span class="n">inner_type</span><span class="o">.</span><span class="n">inner_type</span>
<span class="k">return</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">KV</span><span class="p">[</span><span class="n">key_type</span><span class="p">,</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">value_type</span><span class="p">]]]</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># pylint: disable=wrong-import-order, wrong-import-position</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="k">import</span> <span class="n">create_trigger_driver</span>
<span class="c1"># pylint: enable=wrong-import-order, wrong-import-position</span>
<span class="bp">self</span><span class="o">.</span><span class="n">driver</span> <span class="o">=</span> <span class="n">create_trigger_driver</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">windowing</span><span class="p">,</span> <span class="kc">True</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">k</span><span class="p">,</span> <span class="n">vs</span> <span class="o">=</span> <span class="n">element</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">driver</span><span class="o">.</span><span class="n">process_entire_key</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">vs</span><span class="p">)</span>
<div class="viewcode-block" id="Partition"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Partition">[docs]</a><span class="k">class</span> <span class="nc">Partition</span><span class="p">(</span><span class="n">PTransformWithSideInputs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Split a PCollection into several partitions.</span>
<span class="sd"> Uses the specified PartitionFn to separate an input PCollection into the</span>
<span class="sd"> specified number of sub-PCollections.</span>
<span class="sd"> When apply()d, a Partition() PTransform requires the following:</span>
<span class="sd"> Args:</span>
<span class="sd"> partitionfn: a PartitionFn, or a callable with the signature described in</span>
<span class="sd"> CallableWrapperPartitionFn.</span>
<span class="sd"> n: number of output partitions.</span>
<span class="sd"> The result of this PTransform is a simple list of the output PCollections</span>
<span class="sd"> representing each of n partitions, in order.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="Partition.ApplyPartitionFnFn"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Partition.ApplyPartitionFnFn">[docs]</a> <span class="k">class</span> <span class="nc">ApplyPartitionFnFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A DoFn that applies a PartitionFn.&quot;&quot;&quot;</span>
<div class="viewcode-block" id="Partition.ApplyPartitionFnFn.process"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Partition.ApplyPartitionFnFn.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">partitionfn</span><span class="p">,</span> <span class="n">n</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="n">partition</span> <span class="o">=</span> <span class="n">partitionfn</span><span class="o">.</span><span class="n">partition_for</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">n</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="mi">0</span> <span class="o">&lt;=</span> <span class="n">partition</span> <span class="o">&lt;</span> <span class="n">n</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;PartitionFn specified out-of-bounds partition index: &#39;</span>
<span class="s1">&#39;</span><span class="si">%d</span><span class="s1"> not in [0, </span><span class="si">%d</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">partition</span><span class="p">,</span> <span class="n">n</span><span class="p">))</span>
<span class="c1"># Each input is directed into the output that corresponds to the</span>
<span class="c1"># selected partition.</span>
<span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">partition</span><span class="p">),</span> <span class="n">element</span><span class="p">)</span></div></div>
<div class="viewcode-block" id="Partition.make_fn"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Partition.make_fn">[docs]</a> <span class="k">def</span> <span class="nf">make_fn</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fn</span><span class="p">):</span>
<span class="k">return</span> <span class="n">fn</span> <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="n">PartitionFn</span><span class="p">)</span> <span class="k">else</span> <span class="n">CallableWrapperPartitionFn</span><span class="p">(</span><span class="n">fn</span><span class="p">)</span></div>
<div class="viewcode-block" id="Partition.expand"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Partition.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="n">n</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">ParDo</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">ApplyPartitionFnFn</span><span class="p">(),</span> <span class="bp">self</span><span class="o">.</span><span class="n">fn</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span>
<span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span><span class="o">*</span><span class="p">[</span><span class="nb">str</span><span class="p">(</span><span class="n">t</span><span class="p">)</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">n</span><span class="p">)])</span></div></div>
<div class="viewcode-block" id="Windowing"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Windowing">[docs]</a><span class="k">class</span> <span class="nc">Windowing</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">windowfn</span><span class="p">,</span> <span class="n">triggerfn</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">accumulation_mode</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">timestamp_combiner</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">global</span> <span class="n">AccumulationMode</span><span class="p">,</span> <span class="n">DefaultTrigger</span> <span class="c1"># pylint: disable=global-variable-not-assigned</span>
<span class="c1"># pylint: disable=wrong-import-order, wrong-import-position</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="k">import</span> <span class="n">AccumulationMode</span><span class="p">,</span> <span class="n">DefaultTrigger</span>
<span class="c1"># pylint: enable=wrong-import-order, wrong-import-position</span>
<span class="k">if</span> <span class="n">triggerfn</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">triggerfn</span> <span class="o">=</span> <span class="n">DefaultTrigger</span><span class="p">()</span>
<span class="k">if</span> <span class="n">accumulation_mode</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="n">triggerfn</span> <span class="o">==</span> <span class="n">DefaultTrigger</span><span class="p">():</span>
<span class="n">accumulation_mode</span> <span class="o">=</span> <span class="n">AccumulationMode</span><span class="o">.</span><span class="n">DISCARDING</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;accumulation_mode must be provided for non-trivial triggers&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">windowfn</span><span class="o">.</span><span class="n">get_window_coder</span><span class="p">()</span><span class="o">.</span><span class="n">is_deterministic</span><span class="p">():</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;window fn (</span><span class="si">%s</span><span class="s1">) does not have a determanistic coder (</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="p">(</span>
<span class="n">windowfn</span><span class="p">,</span> <span class="n">windowfn</span><span class="o">.</span><span class="n">get_window_coder</span><span class="p">()))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">windowfn</span> <span class="o">=</span> <span class="n">windowfn</span>
<span class="bp">self</span><span class="o">.</span><span class="n">triggerfn</span> <span class="o">=</span> <span class="n">triggerfn</span>
<span class="bp">self</span><span class="o">.</span><span class="n">accumulation_mode</span> <span class="o">=</span> <span class="n">accumulation_mode</span>
<span class="bp">self</span><span class="o">.</span><span class="n">timestamp_combiner</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">timestamp_combiner</span> <span class="ow">or</span> <span class="n">TimestampCombiner</span><span class="o">.</span><span class="n">OUTPUT_AT_EOW</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_is_default</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">windowfn</span> <span class="o">==</span> <span class="n">GlobalWindows</span><span class="p">()</span> <span class="ow">and</span>
<span class="bp">self</span><span class="o">.</span><span class="n">triggerfn</span> <span class="o">==</span> <span class="n">DefaultTrigger</span><span class="p">()</span> <span class="ow">and</span>
<span class="bp">self</span><span class="o">.</span><span class="n">accumulation_mode</span> <span class="o">==</span> <span class="n">AccumulationMode</span><span class="o">.</span><span class="n">DISCARDING</span> <span class="ow">and</span>
<span class="bp">self</span><span class="o">.</span><span class="n">timestamp_combiner</span> <span class="o">==</span> <span class="n">TimestampCombiner</span><span class="o">.</span><span class="n">OUTPUT_AT_EOW</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s2">&quot;Windowing(</span><span class="si">%s</span><span class="s2">, </span><span class="si">%s</span><span class="s2">, </span><span class="si">%s</span><span class="s2">, </span><span class="si">%s</span><span class="s2">)&quot;</span> <span class="o">%</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">windowfn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">triggerfn</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">accumulation_mode</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">timestamp_combiner</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__eq__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">==</span> <span class="nb">type</span><span class="p">(</span><span class="n">other</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_is_default</span> <span class="ow">and</span> <span class="n">other</span><span class="o">.</span><span class="n">_is_default</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">return</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">windowfn</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">windowfn</span>
<span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">triggerfn</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">triggerfn</span>
<span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">accumulation_mode</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">accumulation_mode</span>
<span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">timestamp_combiner</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">timestamp_combiner</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">def</span> <span class="nf">__hash__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">hash</span><span class="p">((</span><span class="bp">self</span><span class="o">.</span><span class="n">windowfn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">accumulation_mode</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">timestamp_combiner</span><span class="p">))</span>
<div class="viewcode-block" id="Windowing.is_default"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Windowing.is_default">[docs]</a> <span class="k">def</span> <span class="nf">is_default</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_is_default</span></div>
<div class="viewcode-block" id="Windowing.to_runner_api"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Windowing.to_runner_api">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">return</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">WindowingStrategy</span><span class="p">(</span>
<span class="n">window_fn</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">windowfn</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">(</span><span class="n">context</span><span class="p">),</span>
<span class="c1"># TODO(robertwb): Prohibit implicit multi-level merging.</span>
<span class="n">merge_status</span><span class="o">=</span><span class="p">(</span><span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">MergeStatus</span><span class="o">.</span><span class="n">NEEDS_MERGE</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">windowfn</span><span class="o">.</span><span class="n">is_merging</span><span class="p">()</span>
<span class="k">else</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">MergeStatus</span><span class="o">.</span><span class="n">NON_MERGING</span><span class="p">),</span>
<span class="n">window_coder_id</span><span class="o">=</span><span class="n">context</span><span class="o">.</span><span class="n">coders</span><span class="o">.</span><span class="n">get_id</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">windowfn</span><span class="o">.</span><span class="n">get_window_coder</span><span class="p">()),</span>
<span class="n">trigger</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">triggerfn</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">(</span><span class="n">context</span><span class="p">),</span>
<span class="n">accumulation_mode</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">accumulation_mode</span><span class="p">,</span>
<span class="n">output_time</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">timestamp_combiner</span><span class="p">,</span>
<span class="c1"># TODO(robertwb): Support EMIT_IF_NONEMPTY</span>
<span class="n">closing_behavior</span><span class="o">=</span><span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">ClosingBehavior</span><span class="o">.</span><span class="n">EMIT_ALWAYS</span><span class="p">,</span>
<span class="n">OnTimeBehavior</span><span class="o">=</span><span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">OnTimeBehavior</span><span class="o">.</span><span class="n">FIRE_ALWAYS</span><span class="p">,</span>
<span class="n">allowed_lateness</span><span class="o">=</span><span class="mi">0</span><span class="p">)</span></div>
<div class="viewcode-block" id="Windowing.from_runner_api"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Windowing.from_runner_api">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">from_runner_api</span><span class="p">(</span><span class="n">proto</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="c1"># pylint: disable=wrong-import-order, wrong-import-position</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="k">import</span> <span class="n">TriggerFn</span>
<span class="k">return</span> <span class="n">Windowing</span><span class="p">(</span>
<span class="n">windowfn</span><span class="o">=</span><span class="n">WindowFn</span><span class="o">.</span><span class="n">from_runner_api</span><span class="p">(</span><span class="n">proto</span><span class="o">.</span><span class="n">window_fn</span><span class="p">,</span> <span class="n">context</span><span class="p">),</span>
<span class="n">triggerfn</span><span class="o">=</span><span class="n">TriggerFn</span><span class="o">.</span><span class="n">from_runner_api</span><span class="p">(</span><span class="n">proto</span><span class="o">.</span><span class="n">trigger</span><span class="p">,</span> <span class="n">context</span><span class="p">),</span>
<span class="n">accumulation_mode</span><span class="o">=</span><span class="n">proto</span><span class="o">.</span><span class="n">accumulation_mode</span><span class="p">,</span>
<span class="n">timestamp_combiner</span><span class="o">=</span><span class="n">proto</span><span class="o">.</span><span class="n">output_time</span><span class="p">)</span></div></div>
<div class="viewcode-block" id="WindowInto"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.WindowInto">[docs]</a><span class="nd">@typehints</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">T</span><span class="p">)</span>
<span class="nd">@typehints</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">T</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">WindowInto</span><span class="p">(</span><span class="n">ParDo</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A window transform assigning windows to each element of a PCollection.</span>
<span class="sd"> Transforms an input PCollection by applying a windowing function to each</span>
<span class="sd"> element. Each transformed element in the result will be a WindowedValue</span>
<span class="sd"> element with the same input value and timestamp, with its new set of windows</span>
<span class="sd"> determined by the windowing function.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="WindowInto.WindowIntoFn"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.WindowInto.WindowIntoFn">[docs]</a> <span class="k">class</span> <span class="nc">WindowIntoFn</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A DoFn that applies a WindowInto operation.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">windowing</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">windowing</span> <span class="o">=</span> <span class="n">windowing</span>
<div class="viewcode-block" id="WindowInto.WindowIntoFn.process"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.WindowInto.WindowIntoFn.process">[docs]</a> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">=</span><span class="n">DoFn</span><span class="o">.</span><span class="n">TimestampParam</span><span class="p">,</span>
<span class="n">window</span><span class="o">=</span><span class="n">DoFn</span><span class="o">.</span><span class="n">WindowParam</span><span class="p">):</span>
<span class="n">context</span> <span class="o">=</span> <span class="n">WindowFn</span><span class="o">.</span><span class="n">AssignContext</span><span class="p">(</span><span class="n">timestamp</span><span class="p">,</span> <span class="n">element</span><span class="o">=</span><span class="n">element</span><span class="p">,</span>
<span class="n">window</span><span class="o">=</span><span class="n">window</span><span class="p">)</span>
<span class="n">new_windows</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">windowing</span><span class="o">.</span><span class="n">windowfn</span><span class="o">.</span><span class="n">assign</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
<span class="k">yield</span> <span class="n">WindowedValue</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">context</span><span class="o">.</span><span class="n">timestamp</span><span class="p">,</span> <span class="n">new_windows</span><span class="p">)</span></div></div>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">windowfn</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initializes a WindowInto transform.</span>
<span class="sd"> Args:</span>
<span class="sd"> windowfn: Function to be used for windowing</span>
<span class="sd"> trigger: (optional) Trigger used for windowing, or None for default.</span>
<span class="sd"> accumulation_mode: (optional) Accumulation mode used for windowing,</span>
<span class="sd"> required for non-trivial triggers.</span>
<span class="sd"> timestamp_combiner: (optional) Timestamp combniner used for windowing,</span>
<span class="sd"> or None for default.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">windowfn</span><span class="p">,</span> <span class="n">Windowing</span><span class="p">):</span>
<span class="c1"># Overlay windowing with kwargs.</span>
<span class="n">windowing</span> <span class="o">=</span> <span class="n">windowfn</span>
<span class="n">windowfn</span> <span class="o">=</span> <span class="n">windowing</span><span class="o">.</span><span class="n">windowfn</span>
<span class="c1"># Use windowing to fill in defaults for kwargs.</span>
<span class="n">kwargs</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="nb">dict</span><span class="p">(</span>
<span class="n">trigger</span><span class="o">=</span><span class="n">windowing</span><span class="o">.</span><span class="n">triggerfn</span><span class="p">,</span>
<span class="n">accumulation_mode</span><span class="o">=</span><span class="n">windowing</span><span class="o">.</span><span class="n">accumulation_mode</span><span class="p">,</span>
<span class="n">timestamp_combiner</span><span class="o">=</span><span class="n">windowing</span><span class="o">.</span><span class="n">timestamp_combiner</span><span class="p">),</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="c1"># Use kwargs to simulate keyword-only arguments.</span>
<span class="n">triggerfn</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">&#39;trigger&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="n">accumulation_mode</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">&#39;accumulation_mode&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="n">timestamp_combiner</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">&#39;timestamp_combiner&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">if</span> <span class="n">kwargs</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Unexpected keyword arguments: </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="nb">list</span><span class="p">(</span><span class="n">kwargs</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">windowing</span> <span class="o">=</span> <span class="n">Windowing</span><span class="p">(</span>
<span class="n">windowfn</span><span class="p">,</span> <span class="n">triggerfn</span><span class="p">,</span> <span class="n">accumulation_mode</span><span class="p">,</span> <span class="n">timestamp_combiner</span><span class="p">)</span>
<span class="nb">super</span><span class="p">(</span><span class="n">WindowInto</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">WindowIntoFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">windowing</span><span class="p">))</span>
<div class="viewcode-block" id="WindowInto.get_windowing"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.WindowInto.get_windowing">[docs]</a> <span class="k">def</span> <span class="nf">get_windowing</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_inputs</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">windowing</span></div>
<div class="viewcode-block" id="WindowInto.infer_output_type"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.WindowInto.infer_output_type">[docs]</a> <span class="k">def</span> <span class="nf">infer_output_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="p">):</span>
<span class="k">return</span> <span class="n">input_type</span></div>
<div class="viewcode-block" id="WindowInto.expand"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.WindowInto.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="n">input_type</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span>
<span class="k">if</span> <span class="n">input_type</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">output_type</span> <span class="o">=</span> <span class="n">input_type</span>
<span class="bp">self</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">input_type</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">output_type</span><span class="p">)</span>
<span class="k">return</span> <span class="nb">super</span><span class="p">(</span><span class="n">WindowInto</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">expand</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)</span></div>
<div class="viewcode-block" id="WindowInto.to_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.WindowInto.to_runner_api_parameter">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">primitives</span><span class="o">.</span><span class="n">ASSIGN_WINDOWS</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">windowing</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">(</span><span class="n">context</span><span class="p">))</span></div>
<div class="viewcode-block" id="WindowInto.from_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.WindowInto.from_runner_api_parameter">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">from_runner_api_parameter</span><span class="p">(</span><span class="n">proto</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="n">windowing</span> <span class="o">=</span> <span class="n">Windowing</span><span class="o">.</span><span class="n">from_runner_api</span><span class="p">(</span><span class="n">proto</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span>
<span class="k">return</span> <span class="n">WindowInto</span><span class="p">(</span>
<span class="n">windowing</span><span class="o">.</span><span class="n">windowfn</span><span class="p">,</span>
<span class="n">trigger</span><span class="o">=</span><span class="n">windowing</span><span class="o">.</span><span class="n">triggerfn</span><span class="p">,</span>
<span class="n">accumulation_mode</span><span class="o">=</span><span class="n">windowing</span><span class="o">.</span><span class="n">accumulation_mode</span><span class="p">,</span>
<span class="n">timestamp_combiner</span><span class="o">=</span><span class="n">windowing</span><span class="o">.</span><span class="n">timestamp_combiner</span><span class="p">)</span></div></div>
<span class="n">PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">primitives</span><span class="o">.</span><span class="n">ASSIGN_WINDOWS</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span>
<span class="c1"># TODO(robertwb): Update WindowIntoPayload to include the full strategy.</span>
<span class="c1"># (Right now only WindowFn is used, but we need this to reconstitute the</span>
<span class="c1"># WindowInto transform, and in the future will need it at runtime to</span>
<span class="c1"># support meta-data driven triggers.)</span>
<span class="c1"># TODO(robertwb): Use a reference rather than embedding?</span>
<span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">WindowingStrategy</span><span class="p">,</span>
<span class="n">WindowInto</span><span class="o">.</span><span class="n">from_runner_api_parameter</span><span class="p">)</span>
<span class="c1"># Python&#39;s pickling is broken for nested classes.</span>
<span class="n">WindowIntoFn</span> <span class="o">=</span> <span class="n">WindowInto</span><span class="o">.</span><span class="n">WindowIntoFn</span>
<div class="viewcode-block" id="Flatten"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Flatten">[docs]</a><span class="k">class</span> <span class="nc">Flatten</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Merges several PCollections into a single PCollection.</span>
<span class="sd"> Copies all elements in 0 or more PCollections into a single output</span>
<span class="sd"> PCollection. If there are no input PCollections, the resulting PCollection</span>
<span class="sd"> will be empty (but see also kwargs below).</span>
<span class="sd"> Args:</span>
<span class="sd"> **kwargs: Accepts a single named argument &quot;pipeline&quot;, which specifies the</span>
<span class="sd"> pipeline that &quot;owns&quot; this PTransform. Ordinarily Flatten can obtain this</span>
<span class="sd"> information from one of the input PCollections, but if there are none (or</span>
<span class="sd"> if there&#39;s a chance there may be none), this argument is the only way to</span>
<span class="sd"> provide pipeline information and should be considered mandatory.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="nb">super</span><span class="p">(</span><span class="n">Flatten</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">&#39;pipeline&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">if</span> <span class="n">kwargs</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Unexpected keyword arguments: </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="nb">list</span><span class="p">(</span><span class="n">kwargs</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">_extract_input_pvalues</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pvalueish</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">pvalueish</span> <span class="o">=</span> <span class="nb">tuple</span><span class="p">(</span><span class="n">pvalueish</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Input to Flatten must be an iterable. &#39;</span>
<span class="s1">&#39;Got a value of type </span><span class="si">%s</span><span class="s1"> instead.&#39;</span> <span class="o">%</span> <span class="nb">type</span><span class="p">(</span><span class="n">pvalueish</span><span class="p">))</span>
<span class="k">return</span> <span class="n">pvalueish</span><span class="p">,</span> <span class="n">pvalueish</span>
<div class="viewcode-block" id="Flatten.expand"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Flatten.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcolls</span><span class="p">):</span>
<span class="k">for</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">pcolls</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_check_pcollection</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span>
<span class="n">result</span><span class="o">.</span><span class="n">element_type</span> <span class="o">=</span> <span class="n">typehints</span><span class="o">.</span><span class="n">Union</span><span class="p">[</span>
<span class="nb">tuple</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span> <span class="k">for</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">pcolls</span><span class="p">)]</span>
<span class="k">return</span> <span class="n">result</span></div>
<div class="viewcode-block" id="Flatten.get_windowing"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Flatten.get_windowing">[docs]</a> <span class="k">def</span> <span class="nf">get_windowing</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">inputs</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">inputs</span><span class="p">:</span>
<span class="c1"># TODO(robertwb): Return something compatible with every windowing?</span>
<span class="k">return</span> <span class="n">Windowing</span><span class="p">(</span><span class="n">GlobalWindows</span><span class="p">())</span>
<span class="k">return</span> <span class="nb">super</span><span class="p">(</span><span class="n">Flatten</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">get_windowing</span><span class="p">(</span><span class="n">inputs</span><span class="p">)</span></div>
<div class="viewcode-block" id="Flatten.to_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Flatten.to_runner_api_parameter">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">return</span> <span class="n">common_urns</span><span class="o">.</span><span class="n">primitives</span><span class="o">.</span><span class="n">FLATTEN</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> <span class="kc">None</span></div>
<div class="viewcode-block" id="Flatten.from_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Flatten.from_runner_api_parameter">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">from_runner_api_parameter</span><span class="p">(</span><span class="n">unused_parameter</span><span class="p">,</span> <span class="n">unused_context</span><span class="p">):</span>
<span class="k">return</span> <span class="n">Flatten</span><span class="p">()</span></div></div>
<span class="n">PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span>
<span class="n">common_urns</span><span class="o">.</span><span class="n">primitives</span><span class="o">.</span><span class="n">FLATTEN</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> <span class="kc">None</span><span class="p">,</span> <span class="n">Flatten</span><span class="o">.</span><span class="n">from_runner_api_parameter</span><span class="p">)</span>
<div class="viewcode-block" id="Create"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Create">[docs]</a><span class="k">class</span> <span class="nc">Create</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A transform that creates a PCollection from an iterable.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initializes a Create transform.</span>
<span class="sd"> Args:</span>
<span class="sd"> value: An object of values for the PCollection</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nb">super</span><span class="p">(</span><span class="n">Create</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="p">(</span><span class="n">unicode</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="nb">bytes</span><span class="p">)):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">&#39;PTransform Create: Refusing to treat string as &#39;</span>
<span class="s1">&#39;an iterable. (string=</span><span class="si">%r</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="n">value</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span>
<span class="n">value</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="n">items</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">value</span> <span class="o">=</span> <span class="nb">tuple</span><span class="p">(</span><span class="n">value</span><span class="p">)</span>
<div class="viewcode-block" id="Create.to_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Create.to_runner_api_parameter">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="c1"># Required as this is identified by type in PTransformOverrides.</span>
<span class="c1"># TODO(BEAM-3812): Use an actual URN here.</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">to_runner_api_pickled</span><span class="p">(</span><span class="n">context</span><span class="p">)</span></div>
<div class="viewcode-block" id="Create.infer_output_type"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Create.infer_output_type">[docs]</a> <span class="k">def</span> <span class="nf">infer_output_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_input_type</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">value</span><span class="p">:</span>
<span class="k">return</span> <span class="n">Any</span>
<span class="k">return</span> <span class="n">Union</span><span class="p">[[</span><span class="n">trivial_inference</span><span class="o">.</span><span class="n">instance_to_type</span><span class="p">(</span><span class="n">v</span><span class="p">)</span> <span class="k">for</span> <span class="n">v</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">value</span><span class="p">]]</span></div>
<div class="viewcode-block" id="Create.get_output_type"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Create.get_output_type">[docs]</a> <span class="k">def</span> <span class="nf">get_output_type</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">get_type_hints</span><span class="p">()</span><span class="o">.</span><span class="n">simple_output_type</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">label</span><span class="p">)</span> <span class="ow">or</span>
<span class="bp">self</span><span class="o">.</span><span class="n">infer_output_type</span><span class="p">(</span><span class="kc">None</span><span class="p">))</span></div>
<div class="viewcode-block" id="Create.expand"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Create.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pbegin</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.io</span> <span class="k">import</span> <span class="n">iobase</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pbegin</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">=</span> <span class="n">pbegin</span><span class="o">.</span><span class="n">pipeline</span>
<span class="n">coder</span> <span class="o">=</span> <span class="n">typecoders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">get_output_type</span><span class="p">())</span>
<span class="n">source</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_create_source_from_iterable</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="n">coder</span><span class="p">)</span>
<span class="k">return</span> <span class="p">(</span><span class="n">pbegin</span><span class="o">.</span><span class="n">pipeline</span>
<span class="o">|</span> <span class="n">iobase</span><span class="o">.</span><span class="n">Read</span><span class="p">(</span><span class="n">source</span><span class="p">)</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">get_output_type</span><span class="p">()))</span></div>
<div class="viewcode-block" id="Create.get_windowing"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Create.get_windowing">[docs]</a> <span class="k">def</span> <span class="nf">get_windowing</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_inputs</span><span class="p">):</span>
<span class="k">return</span> <span class="n">Windowing</span><span class="p">(</span><span class="n">GlobalWindows</span><span class="p">())</span></div>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">_create_source_from_iterable</span><span class="p">(</span><span class="n">values</span><span class="p">,</span> <span class="n">coder</span><span class="p">):</span>
<span class="k">return</span> <span class="n">Create</span><span class="o">.</span><span class="n">_create_source</span><span class="p">(</span><span class="nb">list</span><span class="p">(</span><span class="nb">map</span><span class="p">(</span><span class="n">coder</span><span class="o">.</span><span class="n">encode</span><span class="p">,</span> <span class="n">values</span><span class="p">)),</span> <span class="n">coder</span><span class="p">)</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">_create_source</span><span class="p">(</span><span class="n">serialized_values</span><span class="p">,</span> <span class="n">coder</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.create_source</span> <span class="k">import</span> <span class="n">_CreateSource</span>
<span class="k">return</span> <span class="n">_CreateSource</span><span class="p">(</span><span class="n">serialized_values</span><span class="p">,</span> <span class="n">coder</span><span class="p">)</span></div>
<div class="viewcode-block" id="Impulse"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Impulse">[docs]</a><span class="k">class</span> <span class="nc">Impulse</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Impulse primitive.&quot;&quot;&quot;</span>
<div class="viewcode-block" id="Impulse.expand"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Impulse.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pbegin</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pbegin</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span>
<span class="s1">&#39;Input to Impulse transform must be a PBegin but found </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">pbegin</span><span class="p">)</span>
<span class="k">return</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">(</span><span class="n">pbegin</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span></div>
<div class="viewcode-block" id="Impulse.get_windowing"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Impulse.get_windowing">[docs]</a> <span class="k">def</span> <span class="nf">get_windowing</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">inputs</span><span class="p">):</span>
<span class="k">return</span> <span class="n">Windowing</span><span class="p">(</span><span class="n">GlobalWindows</span><span class="p">())</span></div>
<div class="viewcode-block" id="Impulse.infer_output_type"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Impulse.infer_output_type">[docs]</a> <span class="k">def</span> <span class="nf">infer_output_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_input_type</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">bytes</span></div>
<div class="viewcode-block" id="Impulse.to_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Impulse.to_runner_api_parameter">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_context</span><span class="p">):</span>
<span class="k">return</span> <span class="n">common_urns</span><span class="o">.</span><span class="n">primitives</span><span class="o">.</span><span class="n">IMPULSE</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> <span class="kc">None</span></div>
<div class="viewcode-block" id="Impulse.from_runner_api_parameter"><a class="viewcode-back" href="../../../apache_beam.transforms.core.html#apache_beam.transforms.core.Impulse.from_runner_api_parameter">[docs]</a> <span class="nd">@PTransform</span><span class="o">.</span><span class="n">register_urn</span><span class="p">(</span><span class="n">common_urns</span><span class="o">.</span><span class="n">primitives</span><span class="o">.</span><span class="n">IMPULSE</span><span class="o">.</span><span class="n">urn</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">from_runner_api_parameter</span><span class="p">(</span><span class="n">unused_parameter</span><span class="p">,</span> <span class="n">unused_context</span><span class="p">):</span>
<span class="k">return</span> <span class="n">Impulse</span><span class="p">()</span></div></div>
</pre></div>
</div>
<div class="articleComments">
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright .
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
var DOCUMENTATION_OPTIONS = {
URL_ROOT:'../../../',
VERSION:'',
COLLAPSE_INDEX:false,
FILE_SUFFIX:'.html',
HAS_SOURCE: true,
SOURCELINK_SUFFIX: '.txt'
};
</script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.StickyNav.enable();
});
</script>
</body>
</html>