blob: b73180b43df35d32534cfefebab269c9ce5b7e42 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.runners.direct.transform_evaluator &mdash; Apache Beam documentation</title>
<link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" />
<link rel="index" title="Index"
href="../../../../genindex.html"/>
<link rel="search" title="Search" href="../../../../search.html"/>
<link rel="top" title="Apache Beam documentation" href="../../../../index.html"/>
<link rel="up" title="Module code" href="../../../index.html"/>
<script src="../../../../_static/js/modernizr.min.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search">
<a href="../../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.internal.html">apache_beam.internal package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.tools.html">apache_beam.tools package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.utils.html">apache_beam.utils package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.version.html">apache_beam.version module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.runners.direct.transform_evaluator</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.runners.direct.transform_evaluator</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;An evaluator of a specific application of a transform.&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">absolute_import</span>
<span class="kn">import</span> <span class="nn">collections</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">random</span>
<span class="kn">import</span> <span class="nn">time</span>
<span class="kn">import</span> <span class="nn">typing</span>
<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">object</span>
<span class="kn">from</span> <span class="nn">future.utils</span> <span class="k">import</span> <span class="n">iteritems</span>
<span class="kn">import</span> <span class="nn">apache_beam.io</span> <span class="k">as</span> <span class="nn">io</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="k">import</span> <span class="n">coders</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="k">import</span> <span class="n">pvalue</span>
<span class="kn">from</span> <span class="nn">apache_beam.internal</span> <span class="k">import</span> <span class="n">pickler</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners</span> <span class="k">import</span> <span class="n">common</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.common</span> <span class="k">import</span> <span class="n">DoFnRunner</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.common</span> <span class="k">import</span> <span class="n">DoFnState</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.dataflow.native_io.iobase</span> <span class="k">import</span> <span class="n">_NativeWrite</span> <span class="c1"># pylint: disable=protected-access</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.direct_runner</span> <span class="k">import</span> <span class="n">_DirectReadFromPubSub</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.direct_runner</span> <span class="k">import</span> <span class="n">_StreamingGroupAlsoByWindow</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.direct_runner</span> <span class="k">import</span> <span class="n">_StreamingGroupByKeyOnly</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.direct_userstate</span> <span class="k">import</span> <span class="n">DirectUserStateContext</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.sdf_direct_runner</span> <span class="k">import</span> <span class="n">ProcessElements</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.sdf_direct_runner</span> <span class="k">import</span> <span class="n">ProcessFn</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.sdf_direct_runner</span> <span class="k">import</span> <span class="n">SDFProcessElementInvoker</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.util</span> <span class="k">import</span> <span class="n">KeyedWorkItem</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.util</span> <span class="k">import</span> <span class="n">TransformResult</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.watermark_manager</span> <span class="k">import</span> <span class="n">WatermarkManager</span>
<span class="kn">from</span> <span class="nn">apache_beam.testing.test_stream</span> <span class="k">import</span> <span class="n">ElementEvent</span>
<span class="kn">from</span> <span class="nn">apache_beam.testing.test_stream</span> <span class="k">import</span> <span class="n">ProcessingTimeEvent</span>
<span class="kn">from</span> <span class="nn">apache_beam.testing.test_stream</span> <span class="k">import</span> <span class="n">TestStream</span>
<span class="kn">from</span> <span class="nn">apache_beam.testing.test_stream</span> <span class="k">import</span> <span class="n">WatermarkEvent</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="k">import</span> <span class="n">core</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="k">import</span> <span class="n">TimeDomain</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="k">import</span> <span class="n">_CombiningValueStateTag</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="k">import</span> <span class="n">_ListStateTag</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="k">import</span> <span class="n">create_trigger_driver</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.userstate</span> <span class="k">import</span> <span class="n">get_dofn_specs</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.userstate</span> <span class="k">import</span> <span class="n">is_stateful_dofn</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="k">import</span> <span class="n">GlobalWindows</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="k">import</span> <span class="n">WindowedValue</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints.typecheck</span> <span class="k">import</span> <span class="n">TypeCheckError</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils</span> <span class="k">import</span> <span class="n">counters</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils.timestamp</span> <span class="k">import</span> <span class="n">MIN_TIMESTAMP</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils.timestamp</span> <span class="k">import</span> <span class="n">Timestamp</span>
<div class="viewcode-block" id="TransformEvaluatorRegistry"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.TransformEvaluatorRegistry">[docs]</a><span class="k">class</span> <span class="nc">TransformEvaluatorRegistry</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;For internal use only; no backwards-compatibility guarantees.</span>
<span class="sd"> Creates instances of TransformEvaluator for the application of a transform.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">_test_evaluators_overrides</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">):</span>
<span class="k">assert</span> <span class="n">evaluation_context</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span> <span class="o">=</span> <span class="n">evaluation_context</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluators</span> <span class="o">=</span> <span class="p">{</span>
<span class="n">io</span><span class="o">.</span><span class="n">Read</span><span class="p">:</span> <span class="n">_BoundedReadEvaluator</span><span class="p">,</span>
<span class="n">_DirectReadFromPubSub</span><span class="p">:</span> <span class="n">_PubSubReadEvaluator</span><span class="p">,</span>
<span class="n">core</span><span class="o">.</span><span class="n">Flatten</span><span class="p">:</span> <span class="n">_FlattenEvaluator</span><span class="p">,</span>
<span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">:</span> <span class="n">_ParDoEvaluator</span><span class="p">,</span>
<span class="n">core</span><span class="o">.</span><span class="n">_GroupByKeyOnly</span><span class="p">:</span> <span class="n">_GroupByKeyOnlyEvaluator</span><span class="p">,</span>
<span class="n">_StreamingGroupByKeyOnly</span><span class="p">:</span> <span class="n">_StreamingGroupByKeyOnlyEvaluator</span><span class="p">,</span>
<span class="n">_StreamingGroupAlsoByWindow</span><span class="p">:</span> <span class="n">_StreamingGroupAlsoByWindowEvaluator</span><span class="p">,</span>
<span class="n">_NativeWrite</span><span class="p">:</span> <span class="n">_NativeWriteEvaluator</span><span class="p">,</span>
<span class="n">TestStream</span><span class="p">:</span> <span class="n">_TestStreamEvaluator</span><span class="p">,</span>
<span class="n">ProcessElements</span><span class="p">:</span> <span class="n">_ProcessElementsEvaluator</span>
<span class="p">}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluators</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_test_evaluators_overrides</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_root_bundle_providers</span> <span class="o">=</span> <span class="p">{</span>
<span class="n">core</span><span class="o">.</span><span class="n">PTransform</span><span class="p">:</span> <span class="n">DefaultRootBundleProvider</span><span class="p">,</span>
<span class="n">TestStream</span><span class="p">:</span> <span class="n">_TestStreamRootBundleProvider</span><span class="p">,</span>
<span class="p">}</span>
<div class="viewcode-block" id="TransformEvaluatorRegistry.get_evaluator"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.TransformEvaluatorRegistry.get_evaluator">[docs]</a> <span class="k">def</span> <span class="nf">get_evaluator</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span> <span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns a TransformEvaluator suitable for processing given inputs.&quot;&quot;&quot;</span>
<span class="k">assert</span> <span class="n">applied_ptransform</span>
<span class="k">assert</span> <span class="nb">bool</span><span class="p">(</span><span class="n">applied_ptransform</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">)</span> <span class="o">==</span> <span class="nb">bool</span><span class="p">(</span><span class="n">side_inputs</span><span class="p">)</span>
<span class="c1"># Walk up the class hierarchy to find an evaluable type. This is necessary</span>
<span class="c1"># for supporting sub-classes of core transforms.</span>
<span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="n">mro</span><span class="p">():</span>
<span class="n">evaluator</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluators</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span>
<span class="k">if</span> <span class="n">evaluator</span><span class="p">:</span>
<span class="k">break</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">evaluator</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span>
<span class="s1">&#39;Execution of [</span><span class="si">%s</span><span class="s1">] not implemented in runner </span><span class="si">%s</span><span class="s1">.&#39;</span> <span class="o">%</span> <span class="p">(</span>
<span class="nb">type</span><span class="p">(</span><span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="p">),</span> <span class="bp">self</span><span class="p">))</span>
<span class="k">return</span> <span class="n">evaluator</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">)</span></div>
<div class="viewcode-block" id="TransformEvaluatorRegistry.get_root_bundle_provider"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.TransformEvaluatorRegistry.get_root_bundle_provider">[docs]</a> <span class="k">def</span> <span class="nf">get_root_bundle_provider</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">):</span>
<span class="n">provider_cls</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="n">mro</span><span class="p">():</span>
<span class="n">provider_cls</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_root_bundle_providers</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span>
<span class="k">if</span> <span class="n">provider_cls</span><span class="p">:</span>
<span class="k">break</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">provider_cls</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span>
<span class="s1">&#39;Root provider for [</span><span class="si">%s</span><span class="s1">] not implemented in runner </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span>
<span class="nb">type</span><span class="p">(</span><span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="p">),</span> <span class="bp">self</span><span class="p">))</span>
<span class="k">return</span> <span class="n">provider_cls</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">)</span></div>
<div class="viewcode-block" id="TransformEvaluatorRegistry.should_execute_serially"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.TransformEvaluatorRegistry.should_execute_serially">[docs]</a> <span class="k">def</span> <span class="nf">should_execute_serially</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns True if this applied_ptransform should run one bundle at a time.</span>
<span class="sd"> Some TransformEvaluators use a global state object to keep track of their</span>
<span class="sd"> global execution state. For example evaluator for _GroupByKeyOnly uses this</span>
<span class="sd"> state as an in memory dictionary to buffer keys.</span>
<span class="sd"> Serially executed evaluators will act as syncing point in the graph and</span>
<span class="sd"> execution will not move forward until they receive all of their inputs. Once</span>
<span class="sd"> they receive all of their input, they will release the combined output.</span>
<span class="sd"> Their output may consist of multiple bundles as they may divide their output</span>
<span class="sd"> into pieces before releasing.</span>
<span class="sd"> Args:</span>
<span class="sd"> applied_ptransform: Transform to be used for execution.</span>
<span class="sd"> Returns:</span>
<span class="sd"> True if executor should execute applied_ptransform serially.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="p">,</span>
<span class="p">(</span><span class="n">core</span><span class="o">.</span><span class="n">_GroupByKeyOnly</span><span class="p">,</span>
<span class="n">_StreamingGroupByKeyOnly</span><span class="p">,</span>
<span class="n">_StreamingGroupAlsoByWindow</span><span class="p">,</span>
<span class="n">_NativeWrite</span><span class="p">)):</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">elif</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="p">,</span> <span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">)</span> <span class="ow">and</span>
<span class="n">is_stateful_dofn</span><span class="p">(</span><span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">dofn</span><span class="p">)):</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">return</span> <span class="kc">False</span></div></div>
<div class="viewcode-block" id="RootBundleProvider"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.RootBundleProvider">[docs]</a><span class="k">class</span> <span class="nc">RootBundleProvider</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Provides bundles for the initial execution of a root transform.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span> <span class="o">=</span> <span class="n">evaluation_context</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span> <span class="o">=</span> <span class="n">applied_ptransform</span>
<div class="viewcode-block" id="RootBundleProvider.get_root_bundles"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.RootBundleProvider.get_root_bundles">[docs]</a> <span class="k">def</span> <span class="nf">get_root_bundles</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div>
<div class="viewcode-block" id="DefaultRootBundleProvider"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.DefaultRootBundleProvider">[docs]</a><span class="k">class</span> <span class="nc">DefaultRootBundleProvider</span><span class="p">(</span><span class="n">RootBundleProvider</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Provides an empty bundle by default for root transforms.&quot;&quot;&quot;</span>
<div class="viewcode-block" id="DefaultRootBundleProvider.get_root_bundles"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.DefaultRootBundleProvider.get_root_bundles">[docs]</a> <span class="k">def</span> <span class="nf">get_root_bundles</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">input_node</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span>
<span class="n">empty_bundle</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_empty_committed_bundle</span><span class="p">(</span><span class="n">input_node</span><span class="p">))</span>
<span class="k">return</span> <span class="p">[</span><span class="n">empty_bundle</span><span class="p">]</span></div></div>
<span class="k">class</span> <span class="nc">_TestStreamRootBundleProvider</span><span class="p">(</span><span class="n">RootBundleProvider</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Provides an initial bundle for the TestStream evaluator.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">get_root_bundles</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">test_stream</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">test_stream</span><span class="o">.</span><span class="n">events</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">:</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span>
<span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">pipeline</span><span class="p">))</span>
<span class="c1"># Explicitly set timestamp to MIN_TIMESTAMP to ensure that we hold the</span>
<span class="c1"># watermark.</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">=</span><span class="n">MIN_TIMESTAMP</span><span class="p">))</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">commit</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span>
<span class="n">bundles</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">bundle</span><span class="p">)</span>
<span class="k">return</span> <span class="n">bundles</span>
<span class="k">class</span> <span class="nc">_TransformEvaluator</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;An evaluator of a specific application of a transform.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span> <span class="o">=</span> <span class="n">evaluation_context</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span> <span class="o">=</span> <span class="n">applied_ptransform</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_input_committed_bundle</span> <span class="o">=</span> <span class="n">input_committed_bundle</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_side_inputs</span> <span class="o">=</span> <span class="n">side_inputs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_expand_outputs</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_execution_context</span> <span class="o">=</span> <span class="n">evaluation_context</span><span class="o">.</span><span class="n">get_execution_context</span><span class="p">(</span>
<span class="n">applied_ptransform</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_execution_context</span><span class="o">.</span><span class="n">get_step_context</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">_expand_outputs</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">outputs</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="k">for</span> <span class="n">pval</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">outputs</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pval</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">DoOutputsTuple</span><span class="p">):</span>
<span class="n">pvals</span> <span class="o">=</span> <span class="p">(</span><span class="n">v</span> <span class="k">for</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">pval</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">pvals</span> <span class="o">=</span> <span class="p">(</span><span class="n">pval</span><span class="p">,)</span>
<span class="k">for</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">pvals</span><span class="p">:</span>
<span class="n">outputs</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">v</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span> <span class="o">=</span> <span class="nb">frozenset</span><span class="p">(</span><span class="n">outputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_split_list_into_bundles</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">output_pcollection</span><span class="p">,</span> <span class="n">elements</span><span class="p">,</span> <span class="n">max_element_per_bundle</span><span class="p">,</span>
<span class="n">element_size_fn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Splits elements, an iterable, into multiple output bundles.</span>
<span class="sd"> Args:</span>
<span class="sd"> output_pcollection: PCollection that the elements belong to.</span>
<span class="sd"> elements: elements to be chunked into bundles.</span>
<span class="sd"> max_element_per_bundle: (approximately) the maximum element per bundle.</span>
<span class="sd"> If it is None, only a single bundle will be produced.</span>
<span class="sd"> element_size_fn: Function to return the size of a given element.</span>
<span class="sd"> Returns:</span>
<span class="sd"> List of output uncommitted bundles with at least one bundle.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="n">bundle_size</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[</span><span class="n">bundle</span><span class="p">]</span>
<span class="k">for</span> <span class="n">element</span> <span class="ow">in</span> <span class="n">elements</span><span class="p">:</span>
<span class="k">if</span> <span class="n">max_element_per_bundle</span> <span class="ow">and</span> <span class="n">bundle_size</span> <span class="o">&gt;=</span> <span class="n">max_element_per_bundle</span><span class="p">:</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="n">bundle_size</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">bundles</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">bundle</span><span class="p">)</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">output</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="n">bundle_size</span> <span class="o">+=</span> <span class="n">element_size_fn</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">return</span> <span class="n">bundles</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Starts a new bundle.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">process_timer_wrapper</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timer_firing</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Process timer by clearing and then calling process_timer().</span>
<span class="sd"> This method is called with any timer firing and clears the delivered</span>
<span class="sd"> timer from the keyed state and then calls process_timer(). The default</span>
<span class="sd"> process_timer() implementation emits a KeyedWorkItem for the particular</span>
<span class="sd"> timer and passes it to process_element(). Evaluator subclasses which</span>
<span class="sd"> desire different timer delivery semantics can override process_timer().</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="n">timer_firing</span><span class="o">.</span><span class="n">encoded_key</span><span class="p">)</span>
<span class="n">state</span><span class="o">.</span><span class="n">clear_timer</span><span class="p">(</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">window</span><span class="p">,</span> <span class="n">timer_firing</span><span class="o">.</span><span class="n">name</span><span class="p">,</span> <span class="n">timer_firing</span><span class="o">.</span><span class="n">time_domain</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">process_timer</span><span class="p">(</span><span class="n">timer_firing</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process_timer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timer_firing</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Default process_timer() impl. generating KeyedWorkItem element.&quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">process_element</span><span class="p">(</span>
<span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span>
<span class="n">KeyedWorkItem</span><span class="p">(</span><span class="n">timer_firing</span><span class="o">.</span><span class="n">encoded_key</span><span class="p">,</span>
<span class="n">timer_firings</span><span class="o">=</span><span class="p">[</span><span class="n">timer_firing</span><span class="p">])))</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Processes a new element as part of the current bundle.&quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="s1">&#39;</span><span class="si">%s</span><span class="s1"> do not process elements.&#39;</span> <span class="o">%</span> <span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Finishes the bundle and produces output.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="k">class</span> <span class="nc">_BoundedReadEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;TransformEvaluator for bounded Read transform.&quot;&quot;&quot;</span>
<span class="c1"># After some benchmarks, 1000 was optimal among {100,1000,10000}</span>
<span class="n">MAX_ELEMENT_PER_BUNDLE</span> <span class="o">=</span> <span class="mi">1000</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source</span> <span class="o">=</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">source</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source</span><span class="o">.</span><span class="n">pipeline_options</span> <span class="o">=</span> <span class="n">evaluation_context</span><span class="o">.</span><span class="n">pipeline_options</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_BoundedReadEvaluator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span> <span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">_read_values_to_bundles</span><span class="p">(</span><span class="n">reader</span><span class="p">):</span>
<span class="n">read_result</span> <span class="o">=</span> <span class="p">[</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">reader</span><span class="p">]</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_split_list_into_bundles</span><span class="p">(</span>
<span class="n">output_pcollection</span><span class="p">,</span> <span class="n">read_result</span><span class="p">,</span>
<span class="n">_BoundedReadEvaluator</span><span class="o">.</span><span class="n">MAX_ELEMENT_PER_BUNDLE</span><span class="p">,</span> <span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="mi">1</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_source</span><span class="p">,</span> <span class="n">io</span><span class="o">.</span><span class="n">iobase</span><span class="o">.</span><span class="n">BoundedSource</span><span class="p">):</span>
<span class="c1"># Getting a RangeTracker for the default range of the source and reading</span>
<span class="c1"># the full source using that.</span>
<span class="n">range_tracker</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source</span><span class="o">.</span><span class="n">get_range_tracker</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="n">reader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="n">range_tracker</span><span class="p">)</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="n">_read_values_to_bundles</span><span class="p">(</span><span class="n">reader</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source</span><span class="o">.</span><span class="n">reader</span><span class="p">()</span> <span class="k">as</span> <span class="n">reader</span><span class="p">:</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="n">_read_values_to_bundles</span><span class="p">(</span><span class="n">reader</span><span class="p">)</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">bundles</span><span class="p">,</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_TestStreamEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;TransformEvaluator for the TestStream transform.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_stream</span> <span class="o">=</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_TestStreamEvaluator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span> <span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">current_index</span> <span class="o">=</span> <span class="o">-</span><span class="mi">1</span>
<span class="bp">self</span><span class="o">.</span><span class="n">watermark</span> <span class="o">=</span> <span class="n">MIN_TIMESTAMP</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">index</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span>
<span class="bp">self</span><span class="o">.</span><span class="n">watermark</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">timestamp</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">index</span><span class="p">,</span> <span class="nb">int</span><span class="p">)</span>
<span class="k">assert</span> <span class="mi">0</span> <span class="o">&lt;=</span> <span class="n">index</span> <span class="o">&lt;=</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">test_stream</span><span class="o">.</span><span class="n">events</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">current_index</span> <span class="o">=</span> <span class="n">index</span>
<span class="n">event</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">test_stream</span><span class="o">.</span><span class="n">events</span><span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">current_index</span><span class="p">]</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">event</span><span class="p">,</span> <span class="n">ElementEvent</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="k">for</span> <span class="n">tv</span> <span class="ow">in</span> <span class="n">event</span><span class="o">.</span><span class="n">timestamped_values</span><span class="p">:</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">output</span><span class="p">(</span>
<span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="n">tv</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">=</span><span class="n">tv</span><span class="o">.</span><span class="n">timestamp</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bundles</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">bundle</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">event</span><span class="p">,</span> <span class="n">WatermarkEvent</span><span class="p">):</span>
<span class="k">assert</span> <span class="n">event</span><span class="o">.</span><span class="n">new_watermark</span> <span class="o">&gt;=</span> <span class="bp">self</span><span class="o">.</span><span class="n">watermark</span>
<span class="bp">self</span><span class="o">.</span><span class="n">watermark</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">new_watermark</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">event</span><span class="p">,</span> <span class="n">ProcessingTimeEvent</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">_watermark_manager</span><span class="o">.</span><span class="n">_clock</span><span class="o">.</span><span class="n">advance_time</span><span class="p">(</span>
<span class="n">event</span><span class="o">.</span><span class="n">advance_by</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Invalid TestStream event: </span><span class="si">%s</span><span class="s1">.&#39;</span> <span class="o">%</span> <span class="n">event</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">unprocessed_bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">hold</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">current_index</span> <span class="o">&lt;</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">test_stream</span><span class="o">.</span><span class="n">events</span><span class="p">)</span> <span class="o">-</span> <span class="mi">1</span><span class="p">:</span>
<span class="n">unprocessed_bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span>
<span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">pipeline</span><span class="p">))</span>
<span class="n">unprocessed_bundle</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">current_index</span> <span class="o">+</span> <span class="mi">1</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">watermark</span><span class="p">))</span>
<span class="n">unprocessed_bundles</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">unprocessed_bundle</span><span class="p">)</span>
<span class="n">hold</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">watermark</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">bundles</span><span class="p">,</span> <span class="n">unprocessed_bundles</span><span class="p">,</span> <span class="kc">None</span><span class="p">,</span> <span class="p">{</span><span class="kc">None</span><span class="p">:</span> <span class="n">hold</span><span class="p">})</span>
<span class="k">class</span> <span class="nc">_PubSubSubscriptionWrapper</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Wrapper for managing temporary PubSub subscriptions.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">project</span><span class="p">,</span> <span class="n">short_topic_name</span><span class="p">,</span> <span class="n">short_sub_name</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initialize subscription wrapper.</span>
<span class="sd"> If sub_name is None, will create a temporary subscription to topic_name.</span>
<span class="sd"> Args:</span>
<span class="sd"> project: GCP project name for topic and subscription. May be None.</span>
<span class="sd"> Required if sub_name is None.</span>
<span class="sd"> short_topic_name: Valid topic name without</span>
<span class="sd"> &#39;projects/{project}/topics/&#39; prefix. May be None.</span>
<span class="sd"> Required if sub_name is None.</span>
<span class="sd"> short_sub_name: Valid subscription name without</span>
<span class="sd"> &#39;projects/{project}/subscriptions/&#39; prefix. May be None.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">google.cloud</span> <span class="k">import</span> <span class="n">pubsub</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sub_client</span> <span class="o">=</span> <span class="n">pubsub</span><span class="o">.</span><span class="n">SubscriberClient</span><span class="p">()</span>
<span class="k">if</span> <span class="n">short_sub_name</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sub_name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sub_client</span><span class="o">.</span><span class="n">subscription_path</span><span class="p">(</span>
<span class="n">project</span><span class="p">,</span> <span class="s1">&#39;beam_</span><span class="si">%d</span><span class="s1">_</span><span class="si">%x</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()),</span> <span class="n">random</span><span class="o">.</span><span class="n">randrange</span><span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">32</span><span class="p">)))</span>
<span class="n">topic_name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sub_client</span><span class="o">.</span><span class="n">topic_path</span><span class="p">(</span><span class="n">project</span><span class="p">,</span> <span class="n">short_topic_name</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sub_client</span><span class="o">.</span><span class="n">create_subscription</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sub_name</span><span class="p">,</span> <span class="n">topic_name</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_should_cleanup</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sub_name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sub_client</span><span class="o">.</span><span class="n">subscription_path</span><span class="p">(</span><span class="n">project</span><span class="p">,</span> <span class="n">short_sub_name</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_should_cleanup</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">def</span> <span class="nf">__del__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_should_cleanup</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sub_client</span><span class="o">.</span><span class="n">delete_subscription</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sub_name</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_PubSubReadEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;TransformEvaluator for PubSub read.&quot;&quot;&quot;</span>
<span class="c1"># A mapping of transform to _PubSubSubscriptionWrapper.</span>
<span class="n">_subscription_cache</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_PubSubReadEvaluator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span> <span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">source</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">_source</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">id_label</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span>
<span class="s1">&#39;DirectRunner: id_label is not supported for PubSub reads&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sub_name</span> <span class="o">=</span> <span class="n">_PubSubReadEvaluator</span><span class="o">.</span><span class="n">get_subscription</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">project</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">topic_name</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">subscription_name</span><span class="p">)</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">get_subscription</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">transform</span><span class="p">,</span> <span class="n">project</span><span class="p">,</span> <span class="n">topic</span><span class="p">,</span> <span class="n">short_sub_name</span><span class="p">):</span>
<span class="k">if</span> <span class="n">transform</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_subscription_cache</span><span class="p">:</span>
<span class="n">wrapper</span> <span class="o">=</span> <span class="n">_PubSubSubscriptionWrapper</span><span class="p">(</span><span class="n">project</span><span class="p">,</span> <span class="n">topic</span><span class="p">,</span> <span class="n">short_sub_name</span><span class="p">)</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_subscription_cache</span><span class="p">[</span><span class="n">transform</span><span class="p">]</span> <span class="o">=</span> <span class="n">wrapper</span>
<span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_subscription_cache</span><span class="p">[</span><span class="n">transform</span><span class="p">]</span><span class="o">.</span><span class="n">sub_name</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">_read_from_pubsub</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timestamp_attribute</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.pubsub</span> <span class="k">import</span> <span class="n">PubsubMessage</span>
<span class="kn">from</span> <span class="nn">google.cloud</span> <span class="k">import</span> <span class="n">pubsub</span>
<span class="k">def</span> <span class="nf">_get_element</span><span class="p">(</span><span class="n">message</span><span class="p">):</span>
<span class="n">parsed_message</span> <span class="o">=</span> <span class="n">PubsubMessage</span><span class="o">.</span><span class="n">_from_message</span><span class="p">(</span><span class="n">message</span><span class="p">)</span>
<span class="k">if</span> <span class="p">(</span><span class="n">timestamp_attribute</span> <span class="ow">and</span>
<span class="n">timestamp_attribute</span> <span class="ow">in</span> <span class="n">parsed_message</span><span class="o">.</span><span class="n">attributes</span><span class="p">):</span>
<span class="n">rfc3339_or_milli</span> <span class="o">=</span> <span class="n">parsed_message</span><span class="o">.</span><span class="n">attributes</span><span class="p">[</span><span class="n">timestamp_attribute</span><span class="p">]</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">timestamp</span> <span class="o">=</span> <span class="n">Timestamp</span><span class="o">.</span><span class="n">from_rfc3339</span><span class="p">(</span><span class="n">rfc3339_or_milli</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">ValueError</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">timestamp</span> <span class="o">=</span> <span class="n">Timestamp</span><span class="p">(</span><span class="n">micros</span><span class="o">=</span><span class="nb">int</span><span class="p">(</span><span class="n">rfc3339_or_milli</span><span class="p">)</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">ValueError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Bad timestamp value: </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">e</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">timestamp</span> <span class="o">=</span> <span class="n">Timestamp</span><span class="p">(</span><span class="n">message</span><span class="o">.</span><span class="n">publish_time</span><span class="o">.</span><span class="n">seconds</span><span class="p">,</span>
<span class="n">message</span><span class="o">.</span><span class="n">publish_time</span><span class="o">.</span><span class="n">nanos</span> <span class="o">//</span> <span class="mi">1000</span><span class="p">)</span>
<span class="k">return</span> <span class="n">timestamp</span><span class="p">,</span> <span class="n">parsed_message</span>
<span class="c1"># Because of the AutoAck, we are not able to reread messages if this</span>
<span class="c1"># evaluator fails with an exception before emitting a bundle. However,</span>
<span class="c1"># the DirectRunner currently doesn&#39;t retry work items anyway, so the</span>
<span class="c1"># pipeline would enter an inconsistent state on any error.</span>
<span class="n">sub_client</span> <span class="o">=</span> <span class="n">pubsub</span><span class="o">.</span><span class="n">SubscriberClient</span><span class="p">()</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">response</span> <span class="o">=</span> <span class="n">sub_client</span><span class="o">.</span><span class="n">pull</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_sub_name</span><span class="p">,</span> <span class="n">max_messages</span><span class="o">=</span><span class="mi">10</span><span class="p">,</span>
<span class="n">return_immediately</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="n">results</span> <span class="o">=</span> <span class="p">[</span><span class="n">_get_element</span><span class="p">(</span><span class="n">rm</span><span class="o">.</span><span class="n">message</span><span class="p">)</span> <span class="k">for</span> <span class="n">rm</span> <span class="ow">in</span> <span class="n">response</span><span class="o">.</span><span class="n">received_messages</span><span class="p">]</span>
<span class="n">ack_ids</span> <span class="o">=</span> <span class="p">[</span><span class="n">rm</span><span class="o">.</span><span class="n">ack_id</span> <span class="k">for</span> <span class="n">rm</span> <span class="ow">in</span> <span class="n">response</span><span class="o">.</span><span class="n">received_messages</span><span class="p">]</span>
<span class="k">if</span> <span class="n">ack_ids</span><span class="p">:</span>
<span class="n">sub_client</span><span class="o">.</span><span class="n">acknowledge</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_sub_name</span><span class="p">,</span> <span class="n">ack_ids</span><span class="p">)</span>
<span class="k">finally</span><span class="p">:</span>
<span class="n">sub_client</span><span class="o">.</span><span class="n">api</span><span class="o">.</span><span class="n">transport</span><span class="o">.</span><span class="n">channel</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="k">return</span> <span class="n">results</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">data</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_from_pubsub</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">timestamp_attribute</span><span class="p">)</span>
<span class="k">if</span> <span class="n">data</span><span class="p">:</span>
<span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="c1"># TODO(ccy): Respect the PubSub source&#39;s id_label field.</span>
<span class="k">for</span> <span class="n">timestamp</span><span class="p">,</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">data</span><span class="p">:</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">with_attributes</span><span class="p">:</span>
<span class="n">element</span> <span class="o">=</span> <span class="n">message</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">element</span> <span class="o">=</span> <span class="n">message</span><span class="o">.</span><span class="n">data</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">output</span><span class="p">(</span>
<span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">=</span><span class="n">timestamp</span><span class="p">))</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[</span><span class="n">bundle</span><span class="p">]</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">inputs</span><span class="p">:</span>
<span class="n">input_pvalue</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">inputs</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">input_pvalue</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span>
<span class="n">unprocessed_bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span>
<span class="n">input_pvalue</span><span class="p">)</span>
<span class="c1"># TODO(udim): Correct value for watermark hold.</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">bundles</span><span class="p">,</span> <span class="p">[</span><span class="n">unprocessed_bundle</span><span class="p">],</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">{</span><span class="kc">None</span><span class="p">:</span> <span class="n">Timestamp</span><span class="o">.</span><span class="n">of</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">())})</span>
<span class="k">class</span> <span class="nc">_FlattenEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;TransformEvaluator for Flatten transform.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_FlattenEvaluator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span> <span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bundle</span><span class="o">.</span><span class="n">output</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">bundle</span><span class="p">]</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">bundles</span><span class="p">,</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_TaggedReceivers</span><span class="p">(</span><span class="nb">dict</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Received ParDo output and redirect to the associated output bundle.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span> <span class="o">=</span> <span class="n">evaluation_context</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_null_receiver</span> <span class="o">=</span> <span class="kc">None</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_TaggedReceivers</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="k">class</span> <span class="nc">NullReceiver</span><span class="p">(</span><span class="n">common</span><span class="o">.</span><span class="n">Receiver</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Ignores undeclared outputs, default execution mode.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">receive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">pass</span>
<span class="k">class</span> <span class="nc">_InMemoryReceiver</span><span class="p">(</span><span class="n">common</span><span class="o">.</span><span class="n">Receiver</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Buffers undeclared outputs to the given dictionary.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">target</span><span class="p">,</span> <span class="n">tag</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_target</span> <span class="o">=</span> <span class="n">target</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_tag</span> <span class="o">=</span> <span class="n">tag</span>
<span class="k">def</span> <span class="nf">receive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_target</span><span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">_tag</span><span class="p">]</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__missing__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_null_receiver</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_null_receiver</span> <span class="o">=</span> <span class="n">_TaggedReceivers</span><span class="o">.</span><span class="n">NullReceiver</span><span class="p">()</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_null_receiver</span>
<span class="k">class</span> <span class="nc">_ParDoEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;TransformEvaluator for ParDo transform.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">,</span>
<span class="n">perform_dofn_pickle_test</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_ParDoEvaluator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span> <span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="c1"># This is a workaround for SDF implementation. SDF implementation adds state</span>
<span class="c1"># to the SDF that is not picklable.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_perform_dofn_pickle_test</span> <span class="o">=</span> <span class="n">perform_dofn_pickle_test</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">transform</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_tagged_receivers</span> <span class="o">=</span> <span class="n">_TaggedReceivers</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="p">)</span>
<span class="k">for</span> <span class="n">output_tag</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">outputs</span><span class="p">:</span>
<span class="n">output_pcollection</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">tag</span><span class="o">=</span><span class="n">output_tag</span><span class="p">)</span>
<span class="n">output_pcollection</span><span class="o">.</span><span class="n">producer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_tagged_receivers</span><span class="p">[</span><span class="n">output_tag</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">output_pcollection</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_tagged_receivers</span><span class="p">[</span><span class="n">output_tag</span><span class="p">]</span><span class="o">.</span><span class="n">tag</span> <span class="o">=</span> <span class="n">output_tag</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_counter_factory</span> <span class="o">=</span> <span class="n">counters</span><span class="o">.</span><span class="n">CounterFactory</span><span class="p">()</span>
<span class="c1"># TODO(aaltay): Consider storing the serialized form as an optimization.</span>
<span class="n">dofn</span> <span class="o">=</span> <span class="p">(</span><span class="n">pickler</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">pickler</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">transform</span><span class="o">.</span><span class="n">dofn</span><span class="p">))</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_perform_dofn_pickle_test</span> <span class="k">else</span> <span class="n">transform</span><span class="o">.</span><span class="n">dofn</span><span class="p">)</span>
<span class="n">args</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">args</span> <span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="s1">&#39;args&#39;</span><span class="p">)</span> <span class="k">else</span> <span class="p">[]</span>
<span class="n">kwargs</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">kwargs</span> <span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="s1">&#39;kwargs&#39;</span><span class="p">)</span> <span class="k">else</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">user_state_context</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">user_timer_map</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">if</span> <span class="n">is_stateful_dofn</span><span class="p">(</span><span class="n">dofn</span><span class="p">):</span>
<span class="n">kv_type_hint</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">inputs</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">element_type</span>
<span class="k">if</span> <span class="n">kv_type_hint</span> <span class="ow">and</span> <span class="n">kv_type_hint</span> <span class="o">!=</span> <span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">:</span>
<span class="n">coder</span> <span class="o">=</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="n">kv_type_hint</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span> <span class="o">=</span> <span class="n">coder</span><span class="o">.</span><span class="n">key_coder</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span> <span class="o">=</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">user_state_context</span> <span class="o">=</span> <span class="n">DirectUserStateContext</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="p">,</span> <span class="n">dofn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span><span class="p">)</span>
<span class="n">_</span><span class="p">,</span> <span class="n">all_timer_specs</span> <span class="o">=</span> <span class="n">get_dofn_specs</span><span class="p">(</span><span class="n">dofn</span><span class="p">)</span>
<span class="k">for</span> <span class="n">timer_spec</span> <span class="ow">in</span> <span class="n">all_timer_specs</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">user_timer_map</span><span class="p">[</span><span class="s1">&#39;user/</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">timer_spec</span><span class="o">.</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">timer_spec</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span> <span class="o">=</span> <span class="n">DoFnRunner</span><span class="p">(</span>
<span class="n">dofn</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_side_inputs</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">inputs</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">windowing</span><span class="p">,</span>
<span class="n">tagged_receivers</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_tagged_receivers</span><span class="p">,</span>
<span class="n">step_name</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">full_label</span><span class="p">,</span>
<span class="n">state</span><span class="o">=</span><span class="n">DoFnState</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_counter_factory</span><span class="p">),</span>
<span class="n">user_state_context</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">user_state_context</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">process_timer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timer_firing</span><span class="p">):</span>
<span class="k">if</span> <span class="n">timer_firing</span><span class="o">.</span><span class="n">name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">user_timer_map</span><span class="p">:</span>
<span class="n">logging</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s1">&#39;Unknown timer fired: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">timer_firing</span><span class="p">)</span>
<span class="n">timer_spec</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">user_timer_map</span><span class="p">[</span><span class="n">timer_firing</span><span class="o">.</span><span class="n">name</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">process_user_timer</span><span class="p">(</span>
<span class="n">timer_spec</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="n">timer_firing</span><span class="o">.</span><span class="n">encoded_key</span><span class="p">),</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">window</span><span class="p">,</span> <span class="n">timer_firing</span><span class="o">.</span><span class="n">timestamp</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">process</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">finish</span><span class="p">()</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_tagged_receivers</span><span class="o">.</span><span class="n">values</span><span class="p">())</span>
<span class="n">result_counters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_counter_factory</span><span class="o">.</span><span class="n">get_counters</span><span class="p">()</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">user_state_context</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">user_state_context</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">bundles</span><span class="p">,</span> <span class="p">[],</span> <span class="n">result_counters</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_GroupByKeyOnlyEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;TransformEvaluator for _GroupByKeyOnly transform.&quot;&quot;&quot;</span>
<span class="n">MAX_ELEMENT_PER_BUNDLE</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">ELEMENTS_TAG</span> <span class="o">=</span> <span class="n">_ListStateTag</span><span class="p">(</span><span class="s1">&#39;elements&#39;</span><span class="p">)</span>
<span class="n">COMPLETION_TAG</span> <span class="o">=</span> <span class="n">_CombiningValueStateTag</span><span class="p">(</span><span class="s1">&#39;completed&#39;</span><span class="p">,</span> <span class="nb">any</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_GroupByKeyOnlyEvaluator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span> <span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_is_final_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_execution_context</span><span class="o">.</span><span class="n">watermarks</span><span class="o">.</span><span class="n">input_watermark</span>
<span class="o">==</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">global_state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="bp">self</span><span class="o">.</span><span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="c1"># The output type of a GroupByKey will be Tuple[Any, Any] or more specific.</span>
<span class="c1"># TODO(BEAM-2717): Infer coders earlier.</span>
<span class="n">kv_type_hint</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">outputs</span><span class="p">[</span><span class="kc">None</span><span class="p">]</span><span class="o">.</span><span class="n">element_type</span>
<span class="ow">or</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">get_type_hints</span><span class="p">()</span><span class="o">.</span><span class="n">input_types</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">])</span>
<span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span> <span class="o">=</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="n">kv_type_hint</span><span class="o">.</span><span class="n">tuple_types</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="k">def</span> <span class="nf">process_timer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timer_firing</span><span class="p">):</span>
<span class="c1"># We do not need to emit a KeyedWorkItem to process_element().</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">get_state</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="n">_GroupByKeyOnlyEvaluator</span><span class="o">.</span><span class="n">COMPLETION_TAG</span><span class="p">)</span>
<span class="k">if</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">WindowedValue</span><span class="p">)</span>
<span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="n">collections</span><span class="o">.</span><span class="n">Iterable</span><span class="p">)</span>
<span class="ow">and</span> <span class="nb">len</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">)</span> <span class="o">==</span> <span class="mi">2</span><span class="p">):</span>
<span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span>
<span class="n">encoded_k</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">k</span><span class="p">)</span>
<span class="n">state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="n">encoded_k</span><span class="p">)</span>
<span class="n">state</span><span class="o">.</span><span class="n">add_state</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">_GroupByKeyOnlyEvaluator</span><span class="o">.</span><span class="n">ELEMENTS_TAG</span><span class="p">,</span> <span class="n">v</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">TypeCheckError</span><span class="p">(</span><span class="s1">&#39;Input to _GroupByKeyOnly must be a PCollection of &#39;</span>
<span class="s1">&#39;windowed key-value pairs. Instead received: </span><span class="si">%r</span><span class="s1">.&#39;</span>
<span class="o">%</span> <span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_is_final_bundle</span><span class="p">():</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">get_state</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="n">_GroupByKeyOnlyEvaluator</span><span class="o">.</span><span class="n">COMPLETION_TAG</span><span class="p">):</span>
<span class="c1"># Ignore empty bundles after emitting output. (This may happen because</span>
<span class="c1"># empty bundles do not affect input watermarks.)</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">gbk_result</span> <span class="o">=</span> <span class="p">[]</span>
<span class="c1"># TODO(ccy): perhaps we can clean this up to not use this</span>
<span class="c1"># internal attribute of the DirectStepContext.</span>
<span class="k">for</span> <span class="n">encoded_k</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">existing_keyed_state</span><span class="p">:</span>
<span class="c1"># Ignore global state.</span>
<span class="k">if</span> <span class="n">encoded_k</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">continue</span>
<span class="n">k</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="n">encoded_k</span><span class="p">)</span>
<span class="n">state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="n">encoded_k</span><span class="p">)</span>
<span class="n">vs</span> <span class="o">=</span> <span class="n">state</span><span class="o">.</span><span class="n">get_state</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">_GroupByKeyOnlyEvaluator</span><span class="o">.</span><span class="n">ELEMENTS_TAG</span><span class="p">)</span>
<span class="n">gbk_result</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">((</span><span class="n">k</span><span class="p">,</span> <span class="n">vs</span><span class="p">)))</span>
<span class="k">def</span> <span class="nf">len_element_fn</span><span class="p">(</span><span class="n">element</span><span class="p">):</span>
<span class="n">_</span><span class="p">,</span> <span class="n">v</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span>
<span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="n">v</span><span class="p">)</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_split_list_into_bundles</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">output_pcollection</span><span class="p">,</span> <span class="n">gbk_result</span><span class="p">,</span>
<span class="n">_GroupByKeyOnlyEvaluator</span><span class="o">.</span><span class="n">MAX_ELEMENT_PER_BUNDLE</span><span class="p">,</span> <span class="n">len_element_fn</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">add_state</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="n">_GroupByKeyOnlyEvaluator</span><span class="o">.</span><span class="n">COMPLETION_TAG</span><span class="p">,</span> <span class="kc">True</span><span class="p">)</span>
<span class="n">hold</span> <span class="o">=</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">hold</span> <span class="o">=</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_NEG_INF</span>
<span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">set_timer</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">,</span> <span class="n">TimeDomain</span><span class="o">.</span><span class="n">WATERMARK</span><span class="p">,</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span><span class="p">)</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">bundles</span><span class="p">,</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="p">{</span><span class="kc">None</span><span class="p">:</span> <span class="n">hold</span><span class="p">})</span>
<span class="k">class</span> <span class="nc">_StreamingGroupByKeyOnlyEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;TransformEvaluator for _StreamingGroupByKeyOnly transform.</span>
<span class="sd"> The _GroupByKeyOnlyEvaluator buffers elements until its input watermark goes</span>
<span class="sd"> to infinity, which is suitable for batch mode execution. During streaming</span>
<span class="sd"> mode execution, we emit each bundle as it comes to the next transform.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">MAX_ELEMENT_PER_BUNDLE</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_StreamingGroupByKeyOnlyEvaluator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span> <span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gbk_items</span> <span class="o">=</span> <span class="n">collections</span><span class="o">.</span><span class="n">defaultdict</span><span class="p">(</span><span class="nb">list</span><span class="p">)</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="bp">self</span><span class="o">.</span><span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="c1"># The input type of a GroupByKey will be Tuple[Any, Any] or more specific.</span>
<span class="n">kv_type_hint</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">inputs</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">element_type</span>
<span class="n">key_type_hint</span> <span class="o">=</span> <span class="p">(</span><span class="n">kv_type_hint</span><span class="o">.</span><span class="n">tuple_types</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="k">if</span> <span class="n">kv_type_hint</span>
<span class="k">else</span> <span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span> <span class="o">=</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="n">key_type_hint</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">if</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">WindowedValue</span><span class="p">)</span>
<span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="n">collections</span><span class="o">.</span><span class="n">Iterable</span><span class="p">)</span>
<span class="ow">and</span> <span class="nb">len</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">)</span> <span class="o">==</span> <span class="mi">2</span><span class="p">):</span>
<span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gbk_items</span><span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">k</span><span class="p">)]</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">v</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">TypeCheckError</span><span class="p">(</span><span class="s1">&#39;Input to _GroupByKeyOnly must be a PCollection of &#39;</span>
<span class="s1">&#39;windowed key-value pairs. Instead received: </span><span class="si">%r</span><span class="s1">.&#39;</span>
<span class="o">%</span> <span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">for</span> <span class="n">encoded_k</span><span class="p">,</span> <span class="n">vs</span> <span class="ow">in</span> <span class="n">iteritems</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">gbk_items</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">bundle</span><span class="p">:</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="n">bundles</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">bundle</span><span class="p">)</span>
<span class="n">kwi</span> <span class="o">=</span> <span class="n">KeyedWorkItem</span><span class="p">(</span><span class="n">encoded_k</span><span class="p">,</span> <span class="n">elements</span><span class="o">=</span><span class="n">vs</span><span class="p">)</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="n">kwi</span><span class="p">))</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">bundles</span><span class="p">,</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_StreamingGroupAlsoByWindowEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;TransformEvaluator for the _StreamingGroupAlsoByWindow transform.</span>
<span class="sd"> This evaluator is only used in streaming mode. In batch mode, the</span>
<span class="sd"> GroupAlsoByWindow operation is evaluated as a normal DoFn, as defined</span>
<span class="sd"> in transforms/core.py.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_StreamingGroupAlsoByWindowEvaluator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span> <span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="bp">self</span><span class="o">.</span><span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">driver</span> <span class="o">=</span> <span class="n">create_trigger_driver</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">windowing</span><span class="p">,</span>
<span class="n">clock</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">_watermark_manager</span><span class="o">.</span><span class="n">_clock</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gabw_items</span> <span class="o">=</span> <span class="p">[]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">keyed_holds</span> <span class="o">=</span> <span class="p">{}</span>
<span class="c1"># The input type (which is the same as the output type) of a</span>
<span class="c1"># GroupAlsoByWindow will be Tuple[Any, Iter[Any]] or more specific.</span>
<span class="n">kv_type_hint</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">outputs</span><span class="p">[</span><span class="kc">None</span><span class="p">]</span><span class="o">.</span><span class="n">element_type</span>
<span class="n">key_type_hint</span> <span class="o">=</span> <span class="p">(</span><span class="n">kv_type_hint</span><span class="o">.</span><span class="n">tuple_types</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="k">if</span> <span class="n">kv_type_hint</span>
<span class="k">else</span> <span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span> <span class="o">=</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="n">key_type_hint</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">kwi</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">kwi</span><span class="p">,</span> <span class="n">KeyedWorkItem</span><span class="p">),</span> <span class="n">kwi</span>
<span class="n">encoded_k</span><span class="p">,</span> <span class="n">timer_firings</span><span class="p">,</span> <span class="n">vs</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">kwi</span><span class="o">.</span><span class="n">encoded_key</span><span class="p">,</span> <span class="n">kwi</span><span class="o">.</span><span class="n">timer_firings</span><span class="p">,</span> <span class="n">kwi</span><span class="o">.</span><span class="n">elements</span><span class="p">)</span>
<span class="n">k</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="n">encoded_k</span><span class="p">)</span>
<span class="n">state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="n">encoded_k</span><span class="p">)</span>
<span class="k">for</span> <span class="n">timer_firing</span> <span class="ow">in</span> <span class="n">timer_firings</span><span class="p">:</span>
<span class="k">for</span> <span class="n">wvalue</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">driver</span><span class="o">.</span><span class="n">process_timer</span><span class="p">(</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">window</span><span class="p">,</span> <span class="n">timer_firing</span><span class="o">.</span><span class="n">name</span><span class="p">,</span> <span class="n">timer_firing</span><span class="o">.</span><span class="n">time_domain</span><span class="p">,</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">timestamp</span><span class="p">,</span> <span class="n">state</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gabw_items</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">wvalue</span><span class="o">.</span><span class="n">with_value</span><span class="p">((</span><span class="n">k</span><span class="p">,</span> <span class="n">wvalue</span><span class="o">.</span><span class="n">value</span><span class="p">)))</span>
<span class="k">if</span> <span class="n">vs</span><span class="p">:</span>
<span class="k">for</span> <span class="n">wvalue</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">driver</span><span class="o">.</span><span class="n">process_elements</span><span class="p">(</span><span class="n">state</span><span class="p">,</span> <span class="n">vs</span><span class="p">,</span> <span class="n">MIN_TIMESTAMP</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gabw_items</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">wvalue</span><span class="o">.</span><span class="n">with_value</span><span class="p">((</span><span class="n">k</span><span class="p">,</span> <span class="n">wvalue</span><span class="o">.</span><span class="n">value</span><span class="p">)))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">keyed_holds</span><span class="p">[</span><span class="n">encoded_k</span><span class="p">]</span> <span class="o">=</span> <span class="n">state</span><span class="o">.</span><span class="n">get_earliest_hold</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">gabw_items</span><span class="p">:</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">gabw_items</span><span class="p">:</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">item</span><span class="p">)</span>
<span class="n">bundles</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">bundle</span><span class="p">)</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">bundles</span><span class="p">,</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">keyed_holds</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_NativeWriteEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;TransformEvaluator for _NativeWrite transform.&quot;&quot;&quot;</span>
<span class="n">ELEMENTS_TAG</span> <span class="o">=</span> <span class="n">_ListStateTag</span><span class="p">(</span><span class="s1">&#39;elements&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_NativeWriteEvaluator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span> <span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">assert</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">sink</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sink</span> <span class="o">=</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">sink</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">_is_final_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_execution_context</span><span class="o">.</span><span class="n">watermarks</span><span class="o">.</span><span class="n">input_watermark</span>
<span class="o">==</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span><span class="p">)</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">_has_already_produced_output</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_execution_context</span><span class="o">.</span><span class="n">watermarks</span><span class="o">.</span><span class="n">output_watermark</span>
<span class="o">==</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">global_state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process_timer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timer_firing</span><span class="p">):</span>
<span class="c1"># We do not need to emit a KeyedWorkItem to process_element().</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">add_state</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="n">_NativeWriteEvaluator</span><span class="o">.</span><span class="n">ELEMENTS_TAG</span><span class="p">,</span> <span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># finish_bundle will append incoming bundles in memory until all the bundles</span>
<span class="c1"># carrying data is processed. This is done to produce only a single output</span>
<span class="c1"># shard (some tests depends on this behavior). It is possible to have</span>
<span class="c1"># incoming empty bundles after the output is produced, these bundles will be</span>
<span class="c1"># ignored and would not generate additional output files.</span>
<span class="c1"># TODO(altay): Do not wait until the last bundle to write in a single shard.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_is_final_bundle</span><span class="p">:</span>
<span class="n">elements</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">get_state</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="n">_NativeWriteEvaluator</span><span class="o">.</span><span class="n">ELEMENTS_TAG</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_has_already_produced_output</span><span class="p">:</span>
<span class="c1"># Ignore empty bundles that arrive after the output is produced.</span>
<span class="k">assert</span> <span class="n">elements</span> <span class="o">==</span> <span class="p">[]</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sink</span><span class="o">.</span><span class="n">pipeline_options</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">pipeline_options</span>
<span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sink</span><span class="o">.</span><span class="n">writer</span><span class="p">()</span> <span class="k">as</span> <span class="n">writer</span><span class="p">:</span>
<span class="k">for</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">elements</span><span class="p">:</span>
<span class="n">writer</span><span class="o">.</span><span class="n">Write</span><span class="p">(</span><span class="n">v</span><span class="o">.</span><span class="n">value</span><span class="p">)</span>
<span class="n">hold</span> <span class="o">=</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">hold</span> <span class="o">=</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_NEG_INF</span>
<span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">set_timer</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">,</span> <span class="n">TimeDomain</span><span class="o">.</span><span class="n">WATERMARK</span><span class="p">,</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span><span class="p">)</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="p">[],</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="p">{</span><span class="kc">None</span><span class="p">:</span> <span class="n">hold</span><span class="p">})</span>
<span class="k">class</span> <span class="nc">_ProcessElementsEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;An evaluator for sdf_direct_runner.ProcessElements transform.&quot;&quot;&quot;</span>
<span class="c1"># Maximum number of elements that will be produced by a Splittable DoFn before</span>
<span class="c1"># a checkpoint is requested by the runner.</span>
<span class="n">DEFAULT_MAX_NUM_OUTPUTS</span> <span class="o">=</span> <span class="mi">100</span>
<span class="c1"># Maximum duration a Splittable DoFn will process an element before a</span>
<span class="c1"># checkpoint is requested by the runner.</span>
<span class="n">DEFAULT_MAX_DURATION</span> <span class="o">=</span> <span class="mi">1</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">):</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_ProcessElementsEvaluator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span> <span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="n">process_elements_transform</span> <span class="o">=</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">process_elements_transform</span><span class="p">,</span> <span class="n">ProcessElements</span><span class="p">)</span>
<span class="c1"># Replacing the do_fn of the transform with a wrapper do_fn that performs</span>
<span class="c1"># SDF magic.</span>
<span class="n">transform</span> <span class="o">=</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span>
<span class="n">sdf</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">sdf</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_process_fn</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">new_process_fn</span><span class="p">(</span><span class="n">sdf</span><span class="p">)</span>
<span class="n">transform</span><span class="o">.</span><span class="n">dofn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_process_fn</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_process_fn</span><span class="p">,</span> <span class="n">ProcessFn</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_process_fn</span><span class="o">.</span><span class="n">step_context</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span>
<span class="n">process_element_invoker</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">SDFProcessElementInvoker</span><span class="p">(</span>
<span class="n">max_num_outputs</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">DEFAULT_MAX_NUM_OUTPUTS</span><span class="p">,</span>
<span class="n">max_duration</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">DEFAULT_MAX_DURATION</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_process_fn</span><span class="o">.</span><span class="n">set_process_element_invoker</span><span class="p">(</span><span class="n">process_element_invoker</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_par_do_evaluator</span> <span class="o">=</span> <span class="n">_ParDoEvaluator</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span> <span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">,</span> <span class="n">perform_dofn_pickle_test</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">keyed_holds</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_par_do_evaluator</span><span class="o">.</span><span class="n">start_bundle</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">WindowedValue</span><span class="p">)</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">windows</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="n">window</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">windows</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="n">KeyedWorkItem</span><span class="p">):</span>
<span class="n">key</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="o">.</span><span class="n">encoded_key</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># If not a `KeyedWorkItem`, this must be a tuple where key is a randomly</span>
<span class="c1"># generated key and the value is a `WindowedValue` that contains an</span>
<span class="c1"># `ElementAndRestriction` object.</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">)</span>
<span class="n">key</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_par_do_evaluator</span><span class="o">.</span><span class="n">process_element</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="n">state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="n">key</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">keyed_holds</span><span class="p">[</span><span class="n">key</span><span class="p">]</span> <span class="o">=</span> <span class="n">state</span><span class="o">.</span><span class="n">get_state</span><span class="p">(</span>
<span class="n">window</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_process_fn</span><span class="o">.</span><span class="n">watermark_hold_tag</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">par_do_result</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_par_do_evaluator</span><span class="o">.</span><span class="n">finish_bundle</span><span class="p">()</span>
<span class="n">transform_result</span> <span class="o">=</span> <span class="n">TransformResult</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">par_do_result</span><span class="o">.</span><span class="n">uncommitted_output_bundles</span><span class="p">,</span>
<span class="n">par_do_result</span><span class="o">.</span><span class="n">unprocessed_bundles</span><span class="p">,</span> <span class="n">par_do_result</span><span class="o">.</span><span class="n">counters</span><span class="p">,</span>
<span class="n">par_do_result</span><span class="o">.</span><span class="n">keyed_watermark_holds</span><span class="p">,</span>
<span class="n">par_do_result</span><span class="o">.</span><span class="n">undeclared_tag_values</span><span class="p">)</span>
<span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">keyed_holds</span><span class="p">:</span>
<span class="n">transform_result</span><span class="o">.</span><span class="n">keyed_watermark_holds</span><span class="p">[</span><span class="n">key</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">keyed_holds</span><span class="p">[</span><span class="n">key</span><span class="p">]</span>
<span class="k">return</span> <span class="n">transform_result</span>
</pre></div>
</div>
<div class="articleComments">
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright .
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
var DOCUMENTATION_OPTIONS = {
URL_ROOT:'../../../../',
VERSION:'',
COLLAPSE_INDEX:false,
FILE_SUFFIX:'.html',
HAS_SOURCE: true,
SOURCELINK_SUFFIX: '.txt'
};
</script>
<script type="text/javascript" src="../../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../../_static/js/theme.js"></script>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.StickyNav.enable();
});
</script>
</body>
</html>