blob: f6b8b03866bd960c02552fd843468c7ca16f04c0 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.runners.direct.transform_evaluator &mdash; Apache Beam 2.47.0 documentation</title>
<script type="text/javascript" src="../../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../../" src="../../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../../genindex.html" />
<link rel="search" title="Search" href="../../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.47.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.runners.direct.transform_evaluator</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.runners.direct.transform_evaluator</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;An evaluator of a specific application of a transform.&quot;&quot;&quot;</span>
<span class="c1"># pytype: skip-file</span>
<span class="kn">import</span> <span class="nn">atexit</span>
<span class="kn">import</span> <span class="nn">collections</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">random</span>
<span class="kn">import</span> <span class="nn">time</span>
<span class="kn">from</span> <span class="nn">collections</span> <span class="kn">import</span> <span class="n">abc</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">TYPE_CHECKING</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Any</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Dict</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">List</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Tuple</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Type</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">coders</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">io</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">pvalue</span>
<span class="kn">from</span> <span class="nn">apache_beam.internal</span> <span class="kn">import</span> <span class="n">pickler</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners</span> <span class="kn">import</span> <span class="n">common</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.common</span> <span class="kn">import</span> <span class="n">DoFnRunner</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.common</span> <span class="kn">import</span> <span class="n">DoFnState</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.dataflow.native_io.iobase</span> <span class="kn">import</span> <span class="n">_NativeWrite</span> <span class="c1"># pylint: disable=protected-access</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.direct_runner</span> <span class="kn">import</span> <span class="n">_DirectReadFromPubSub</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.direct_runner</span> <span class="kn">import</span> <span class="n">_GroupByKeyOnly</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.direct_runner</span> <span class="kn">import</span> <span class="n">_StreamingGroupAlsoByWindow</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.direct_runner</span> <span class="kn">import</span> <span class="n">_StreamingGroupByKeyOnly</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.direct_userstate</span> <span class="kn">import</span> <span class="n">DirectUserStateContext</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.sdf_direct_runner</span> <span class="kn">import</span> <span class="n">ProcessElements</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.sdf_direct_runner</span> <span class="kn">import</span> <span class="n">ProcessFn</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.sdf_direct_runner</span> <span class="kn">import</span> <span class="n">SDFProcessElementInvoker</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.test_stream_impl</span> <span class="kn">import</span> <span class="n">_TestStream</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.test_stream_impl</span> <span class="kn">import</span> <span class="n">_WatermarkController</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.util</span> <span class="kn">import</span> <span class="n">KeyedWorkItem</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.util</span> <span class="kn">import</span> <span class="n">TransformResult</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.watermark_manager</span> <span class="kn">import</span> <span class="n">WatermarkManager</span>
<span class="kn">from</span> <span class="nn">apache_beam.testing.test_stream</span> <span class="kn">import</span> <span class="n">ElementEvent</span>
<span class="kn">from</span> <span class="nn">apache_beam.testing.test_stream</span> <span class="kn">import</span> <span class="n">PairWithTiming</span>
<span class="kn">from</span> <span class="nn">apache_beam.testing.test_stream</span> <span class="kn">import</span> <span class="n">ProcessingTimeEvent</span>
<span class="kn">from</span> <span class="nn">apache_beam.testing.test_stream</span> <span class="kn">import</span> <span class="n">TimingInfo</span>
<span class="kn">from</span> <span class="nn">apache_beam.testing.test_stream</span> <span class="kn">import</span> <span class="n">WatermarkEvent</span>
<span class="kn">from</span> <span class="nn">apache_beam.testing.test_stream</span> <span class="kn">import</span> <span class="n">WindowedValueHolder</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">core</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="kn">import</span> <span class="n">InMemoryUnmergedState</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="kn">import</span> <span class="n">TimeDomain</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="kn">import</span> <span class="n">_CombiningValueStateTag</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="kn">import</span> <span class="n">_ListStateTag</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="kn">import</span> <span class="n">_ReadModifyWriteStateTag</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.trigger</span> <span class="kn">import</span> <span class="n">create_trigger_driver</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.userstate</span> <span class="kn">import</span> <span class="n">get_dofn_specs</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.userstate</span> <span class="kn">import</span> <span class="n">is_stateful_dofn</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">GlobalWindows</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">WindowedValue</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints.typecheck</span> <span class="kn">import</span> <span class="n">TypeCheckError</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils</span> <span class="kn">import</span> <span class="n">counters</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils.timestamp</span> <span class="kn">import</span> <span class="n">MIN_TIMESTAMP</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils.timestamp</span> <span class="kn">import</span> <span class="n">Timestamp</span>
<span class="k">if</span> <span class="n">TYPE_CHECKING</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.pubsub</span> <span class="kn">import</span> <span class="n">_PubSubSource</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.pubsub</span> <span class="kn">import</span> <span class="n">PubsubMessage</span>
<span class="kn">from</span> <span class="nn">apache_beam.pipeline</span> <span class="kn">import</span> <span class="n">AppliedPTransform</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.direct.evaluation_context</span> <span class="kn">import</span> <span class="n">EvaluationContext</span>
<span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>
<div class="viewcode-block" id="TransformEvaluatorRegistry"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.TransformEvaluatorRegistry">[docs]</a><span class="k">class</span> <span class="nc">TransformEvaluatorRegistry</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;For internal use only; no backwards-compatibility guarantees.</span>
<span class="sd"> Creates instances of TransformEvaluator for the application of a transform.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">_test_evaluators_overrides</span> <span class="o">=</span> <span class="p">{</span>
<span class="p">}</span> <span class="c1"># type: Dict[Type[core.PTransform], Type[_TransformEvaluator]]</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">):</span>
<span class="c1"># type: (EvaluationContext) -&gt; None</span>
<span class="k">assert</span> <span class="n">evaluation_context</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span> <span class="o">=</span> <span class="n">evaluation_context</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluators</span> <span class="o">=</span> <span class="p">{</span>
<span class="n">io</span><span class="o">.</span><span class="n">Read</span><span class="p">:</span> <span class="n">_BoundedReadEvaluator</span><span class="p">,</span>
<span class="n">_DirectReadFromPubSub</span><span class="p">:</span> <span class="n">_PubSubReadEvaluator</span><span class="p">,</span>
<span class="n">core</span><span class="o">.</span><span class="n">Flatten</span><span class="p">:</span> <span class="n">_FlattenEvaluator</span><span class="p">,</span>
<span class="n">core</span><span class="o">.</span><span class="n">Impulse</span><span class="p">:</span> <span class="n">_ImpulseEvaluator</span><span class="p">,</span>
<span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">:</span> <span class="n">_ParDoEvaluator</span><span class="p">,</span>
<span class="n">_GroupByKeyOnly</span><span class="p">:</span> <span class="n">_GroupByKeyOnlyEvaluator</span><span class="p">,</span>
<span class="n">_StreamingGroupByKeyOnly</span><span class="p">:</span> <span class="n">_StreamingGroupByKeyOnlyEvaluator</span><span class="p">,</span>
<span class="n">_StreamingGroupAlsoByWindow</span><span class="p">:</span> <span class="n">_StreamingGroupAlsoByWindowEvaluator</span><span class="p">,</span>
<span class="n">_NativeWrite</span><span class="p">:</span> <span class="n">_NativeWriteEvaluator</span><span class="p">,</span>
<span class="n">_TestStream</span><span class="p">:</span> <span class="n">_TestStreamEvaluator</span><span class="p">,</span>
<span class="n">ProcessElements</span><span class="p">:</span> <span class="n">_ProcessElementsEvaluator</span><span class="p">,</span>
<span class="n">_WatermarkController</span><span class="p">:</span> <span class="n">_WatermarkControllerEvaluator</span><span class="p">,</span>
<span class="n">PairWithTiming</span><span class="p">:</span> <span class="n">_PairWithTimingEvaluator</span><span class="p">,</span>
<span class="p">}</span> <span class="c1"># type: Dict[Type[core.PTransform], Type[_TransformEvaluator]]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluators</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_test_evaluators_overrides</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_root_bundle_providers</span> <span class="o">=</span> <span class="p">{</span>
<span class="n">core</span><span class="o">.</span><span class="n">PTransform</span><span class="p">:</span> <span class="n">DefaultRootBundleProvider</span><span class="p">,</span>
<span class="n">_TestStream</span><span class="p">:</span> <span class="n">_TestStreamRootBundleProvider</span><span class="p">,</span>
<span class="p">}</span>
<div class="viewcode-block" id="TransformEvaluatorRegistry.get_evaluator"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.TransformEvaluatorRegistry.get_evaluator">[docs]</a> <span class="k">def</span> <span class="nf">get_evaluator</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">,</span> <span class="n">input_committed_bundle</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns a TransformEvaluator suitable for processing given inputs.&quot;&quot;&quot;</span>
<span class="k">assert</span> <span class="n">applied_ptransform</span>
<span class="k">assert</span> <span class="nb">bool</span><span class="p">(</span><span class="n">applied_ptransform</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">)</span> <span class="o">==</span> <span class="nb">bool</span><span class="p">(</span><span class="n">side_inputs</span><span class="p">)</span>
<span class="c1"># Walk up the class hierarchy to find an evaluable type. This is necessary</span>
<span class="c1"># for supporting sub-classes of core transforms.</span>
<span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="n">mro</span><span class="p">():</span>
<span class="n">evaluator</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluators</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span>
<span class="k">if</span> <span class="n">evaluator</span><span class="p">:</span>
<span class="k">break</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">evaluator</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span>
<span class="s1">&#39;Execution of [</span><span class="si">%s</span><span class="s1">] not implemented in runner </span><span class="si">%s</span><span class="s1">.&#39;</span> <span class="o">%</span>
<span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="p">),</span> <span class="bp">self</span><span class="p">))</span>
<span class="k">return</span> <span class="n">evaluator</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span></div>
<div class="viewcode-block" id="TransformEvaluatorRegistry.get_root_bundle_provider"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.TransformEvaluatorRegistry.get_root_bundle_provider">[docs]</a> <span class="k">def</span> <span class="nf">get_root_bundle_provider</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">):</span>
<span class="n">provider_cls</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="n">mro</span><span class="p">():</span>
<span class="n">provider_cls</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_root_bundle_providers</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span>
<span class="k">if</span> <span class="n">provider_cls</span><span class="p">:</span>
<span class="k">break</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">provider_cls</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span>
<span class="s1">&#39;Root provider for [</span><span class="si">%s</span><span class="s1">] not implemented in runner </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span>
<span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="p">),</span> <span class="bp">self</span><span class="p">))</span>
<span class="k">return</span> <span class="n">provider_cls</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">)</span></div>
<div class="viewcode-block" id="TransformEvaluatorRegistry.should_execute_serially"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.TransformEvaluatorRegistry.should_execute_serially">[docs]</a> <span class="k">def</span> <span class="nf">should_execute_serially</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns True if this applied_ptransform should run one bundle at a time.</span>
<span class="sd"> Some TransformEvaluators use a global state object to keep track of their</span>
<span class="sd"> global execution state. For example evaluator for _GroupByKeyOnly uses this</span>
<span class="sd"> state as an in memory dictionary to buffer keys.</span>
<span class="sd"> Serially executed evaluators will act as syncing point in the graph and</span>
<span class="sd"> execution will not move forward until they receive all of their inputs. Once</span>
<span class="sd"> they receive all of their input, they will release the combined output.</span>
<span class="sd"> Their output may consist of multiple bundles as they may divide their output</span>
<span class="sd"> into pieces before releasing.</span>
<span class="sd"> Args:</span>
<span class="sd"> applied_ptransform: Transform to be used for execution.</span>
<span class="sd"> Returns:</span>
<span class="sd"> True if executor should execute applied_ptransform serially.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="p">,</span>
<span class="p">(</span><span class="n">_GroupByKeyOnly</span><span class="p">,</span>
<span class="n">_StreamingGroupByKeyOnly</span><span class="p">,</span>
<span class="n">_StreamingGroupAlsoByWindow</span><span class="p">,</span>
<span class="n">_NativeWrite</span><span class="p">)):</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">elif</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="p">,</span> <span class="n">core</span><span class="o">.</span><span class="n">ParDo</span><span class="p">)</span> <span class="ow">and</span>
<span class="n">is_stateful_dofn</span><span class="p">(</span><span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">dofn</span><span class="p">)):</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">return</span> <span class="kc">False</span></div></div>
<div class="viewcode-block" id="RootBundleProvider"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.RootBundleProvider">[docs]</a><span class="k">class</span> <span class="nc">RootBundleProvider</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Provides bundles for the initial execution of a root transform.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span> <span class="o">=</span> <span class="n">evaluation_context</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span> <span class="o">=</span> <span class="n">applied_ptransform</span>
<div class="viewcode-block" id="RootBundleProvider.get_root_bundles"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.RootBundleProvider.get_root_bundles">[docs]</a> <span class="k">def</span> <span class="nf">get_root_bundles</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div>
<div class="viewcode-block" id="DefaultRootBundleProvider"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.DefaultRootBundleProvider">[docs]</a><span class="k">class</span> <span class="nc">DefaultRootBundleProvider</span><span class="p">(</span><span class="n">RootBundleProvider</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Provides an empty bundle by default for root transforms.&quot;&quot;&quot;</span>
<div class="viewcode-block" id="DefaultRootBundleProvider.get_root_bundles"><a class="viewcode-back" href="../../../../apache_beam.runners.direct.transform_evaluator.html#apache_beam.runners.direct.transform_evaluator.DefaultRootBundleProvider.get_root_bundles">[docs]</a> <span class="k">def</span> <span class="nf">get_root_bundles</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">input_node</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span>
<span class="n">empty_bundle</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_empty_committed_bundle</span><span class="p">(</span><span class="n">input_node</span><span class="p">))</span>
<span class="k">return</span> <span class="p">[</span><span class="n">empty_bundle</span><span class="p">]</span></div></div>
<span class="k">class</span> <span class="nc">_TestStreamRootBundleProvider</span><span class="p">(</span><span class="n">RootBundleProvider</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Provides an initial bundle for the TestStream evaluator.</span>
<span class="sd"> This bundle is used as the initial state to the TestStream. Each unprocessed</span>
<span class="sd"> bundle emitted from the TestStream afterwards is its state: index into the</span>
<span class="sd"> stream, and the watermark.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">get_root_bundles</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">test_stream</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span>
<span class="c1"># If there was an endpoint defined then get the events from the</span>
<span class="c1"># TestStreamService.</span>
<span class="k">if</span> <span class="n">test_stream</span><span class="o">.</span><span class="n">endpoint</span><span class="p">:</span>
<span class="n">_TestStreamEvaluator</span><span class="o">.</span><span class="n">event_stream</span> <span class="o">=</span> <span class="n">_TestStream</span><span class="o">.</span><span class="n">events_from_rpc</span><span class="p">(</span>
<span class="n">test_stream</span><span class="o">.</span><span class="n">endpoint</span><span class="p">,</span>
<span class="n">test_stream</span><span class="o">.</span><span class="n">output_tags</span><span class="p">,</span>
<span class="n">test_stream</span><span class="o">.</span><span class="n">coder</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">_TestStreamEvaluator</span><span class="o">.</span><span class="n">event_stream</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">_TestStream</span><span class="o">.</span><span class="n">events_from_script</span><span class="p">(</span><span class="n">test_stream</span><span class="o">.</span><span class="n">_events</span><span class="p">))</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span>
<span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">pipeline</span><span class="p">))</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="sa">b</span><span class="s1">&#39;&#39;</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">=</span><span class="n">MIN_TIMESTAMP</span><span class="p">))</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">commit</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span>
<span class="k">return</span> <span class="p">[</span><span class="n">bundle</span><span class="p">]</span>
<span class="k">class</span> <span class="nc">_TransformEvaluator</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;An evaluator of a specific application of a transform.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span>
<span class="n">evaluation_context</span><span class="p">,</span> <span class="c1"># type: EvaluationContext</span>
<span class="n">applied_ptransform</span><span class="p">,</span> <span class="c1"># type: AppliedPTransform</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span>
<span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span> <span class="o">=</span> <span class="n">evaluation_context</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span> <span class="o">=</span> <span class="n">applied_ptransform</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_input_committed_bundle</span> <span class="o">=</span> <span class="n">input_committed_bundle</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_side_inputs</span> <span class="o">=</span> <span class="n">side_inputs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_expand_outputs</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_execution_context</span> <span class="o">=</span> <span class="n">evaluation_context</span><span class="o">.</span><span class="n">get_execution_context</span><span class="p">(</span>
<span class="n">applied_ptransform</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_execution_context</span><span class="o">.</span><span class="n">get_step_context</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">_expand_outputs</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">outputs</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="k">for</span> <span class="n">pval</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">outputs</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pval</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">DoOutputsTuple</span><span class="p">):</span>
<span class="n">pvals</span> <span class="o">=</span> <span class="p">(</span><span class="n">v</span> <span class="k">for</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">pval</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">pvals</span> <span class="o">=</span> <span class="p">(</span><span class="n">pval</span><span class="p">,</span> <span class="p">)</span>
<span class="k">for</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">pvals</span><span class="p">:</span>
<span class="n">outputs</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">v</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span> <span class="o">=</span> <span class="nb">frozenset</span><span class="p">(</span><span class="n">outputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_split_list_into_bundles</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">output_pcollection</span><span class="p">,</span>
<span class="n">elements</span><span class="p">,</span>
<span class="n">max_element_per_bundle</span><span class="p">,</span>
<span class="n">element_size_fn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Splits elements, an iterable, into multiple output bundles.</span>
<span class="sd"> Args:</span>
<span class="sd"> output_pcollection: PCollection that the elements belong to.</span>
<span class="sd"> elements: elements to be chunked into bundles.</span>
<span class="sd"> max_element_per_bundle: (approximately) the maximum element per bundle.</span>
<span class="sd"> If it is None, only a single bundle will be produced.</span>
<span class="sd"> element_size_fn: Function to return the size of a given element.</span>
<span class="sd"> Returns:</span>
<span class="sd"> List of output uncommitted bundles with at least one bundle.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="n">bundle_size</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[</span><span class="n">bundle</span><span class="p">]</span>
<span class="k">for</span> <span class="n">element</span> <span class="ow">in</span> <span class="n">elements</span><span class="p">:</span>
<span class="k">if</span> <span class="n">max_element_per_bundle</span> <span class="ow">and</span> <span class="n">bundle_size</span> <span class="o">&gt;=</span> <span class="n">max_element_per_bundle</span><span class="p">:</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="n">bundle_size</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">bundles</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">bundle</span><span class="p">)</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">output</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="n">bundle_size</span> <span class="o">+=</span> <span class="n">element_size_fn</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">return</span> <span class="n">bundles</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Starts a new bundle.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">process_timer_wrapper</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timer_firing</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Process timer by clearing and then calling process_timer().</span>
<span class="sd"> This method is called with any timer firing and clears the delivered</span>
<span class="sd"> timer from the keyed state and then calls process_timer(). The default</span>
<span class="sd"> process_timer() implementation emits a KeyedWorkItem for the particular</span>
<span class="sd"> timer and passes it to process_element(). Evaluator subclasses which</span>
<span class="sd"> desire different timer delivery semantics can override process_timer().</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="n">timer_firing</span><span class="o">.</span><span class="n">encoded_key</span><span class="p">)</span>
<span class="n">state</span><span class="o">.</span><span class="n">clear_timer</span><span class="p">(</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">window</span><span class="p">,</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">name</span><span class="p">,</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">time_domain</span><span class="p">,</span>
<span class="n">dynamic_timer_tag</span><span class="o">=</span><span class="n">timer_firing</span><span class="o">.</span><span class="n">dynamic_timer_tag</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">process_timer</span><span class="p">(</span><span class="n">timer_firing</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process_timer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timer_firing</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Default process_timer() impl. generating KeyedWorkItem element.&quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">process_element</span><span class="p">(</span>
<span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span>
<span class="n">KeyedWorkItem</span><span class="p">(</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">encoded_key</span><span class="p">,</span> <span class="n">timer_firings</span><span class="o">=</span><span class="p">[</span><span class="n">timer_firing</span><span class="p">])))</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Processes a new element as part of the current bundle.&quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="s1">&#39;</span><span class="si">%s</span><span class="s1"> do not process elements.&#39;</span> <span class="o">%</span> <span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; TransformResult</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Finishes the bundle and produces output.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="k">class</span> <span class="nc">_BoundedReadEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;TransformEvaluator for bounded Read transform.&quot;&quot;&quot;</span>
<span class="c1"># After some benchmarks, 1000 was optimal among {100,1000,10000}</span>
<span class="n">MAX_ELEMENT_PER_BUNDLE</span> <span class="o">=</span> <span class="mi">1000</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source</span> <span class="o">=</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">source</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source</span><span class="o">.</span><span class="n">pipeline_options</span> <span class="o">=</span> <span class="n">evaluation_context</span><span class="o">.</span><span class="n">pipeline_options</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">_read_values_to_bundles</span><span class="p">(</span><span class="n">reader</span><span class="p">):</span>
<span class="n">read_result</span> <span class="o">=</span> <span class="p">[</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">reader</span><span class="p">]</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_split_list_into_bundles</span><span class="p">(</span>
<span class="n">output_pcollection</span><span class="p">,</span>
<span class="n">read_result</span><span class="p">,</span>
<span class="n">_BoundedReadEvaluator</span><span class="o">.</span><span class="n">MAX_ELEMENT_PER_BUNDLE</span><span class="p">,</span>
<span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="mi">1</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_source</span><span class="p">,</span> <span class="n">io</span><span class="o">.</span><span class="n">iobase</span><span class="o">.</span><span class="n">BoundedSource</span><span class="p">):</span>
<span class="c1"># Getting a RangeTracker for the default range of the source and reading</span>
<span class="c1"># the full source using that.</span>
<span class="n">range_tracker</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source</span><span class="o">.</span><span class="n">get_range_tracker</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="n">reader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="n">range_tracker</span><span class="p">)</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="n">_read_values_to_bundles</span><span class="p">(</span><span class="n">reader</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source</span><span class="o">.</span><span class="n">reader</span><span class="p">()</span> <span class="k">as</span> <span class="n">reader</span><span class="p">:</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="n">_read_values_to_bundles</span><span class="p">(</span><span class="n">reader</span><span class="p">)</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">bundles</span><span class="p">,</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_WatermarkControllerEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;TransformEvaluator for the _WatermarkController transform.</span>
<span class="sd"> This is used to enable multiple output watermarks for the TestStream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># The state tag used to store the watermark.</span>
<span class="n">WATERMARK_TAG</span> <span class="o">=</span> <span class="n">_ReadModifyWriteStateTag</span><span class="p">(</span>
<span class="s1">&#39;_WatermarkControllerEvaluator_Watermark_Tag&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">transform</span> <span class="o">=</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_init_state</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">_init_state</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Gets and sets the initial state.</span>
<span class="sd"> This is used to keep track of the watermark hold between calls.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">transform_states</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">_transform_keyed_states</span>
<span class="n">state</span> <span class="o">=</span> <span class="n">transform_states</span><span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="p">]</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">WATERMARK_TAG</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">state</span><span class="p">:</span>
<span class="n">watermark_state</span> <span class="o">=</span> <span class="n">InMemoryUnmergedState</span><span class="p">()</span>
<span class="n">watermark_state</span><span class="o">.</span><span class="n">set_global_state</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">WATERMARK_TAG</span><span class="p">,</span> <span class="n">MIN_TIMESTAMP</span><span class="p">)</span>
<span class="n">state</span><span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">WATERMARK_TAG</span><span class="p">]</span> <span class="o">=</span> <span class="n">watermark_state</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_state</span> <span class="o">=</span> <span class="n">state</span><span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">WATERMARK_TAG</span><span class="p">]</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">_watermark</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_state</span><span class="o">.</span><span class="n">get_global_state</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">WATERMARK_TAG</span><span class="p">)</span>
<span class="nd">@_watermark</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">_watermark</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">watermark</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_state</span><span class="o">.</span><span class="n">set_global_state</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">WATERMARK_TAG</span><span class="p">,</span> <span class="n">watermark</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="c1"># In order to keep the order of the elements between the script and what</span>
<span class="c1"># flows through the pipeline the same, emit the elements here.</span>
<span class="n">event</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">event</span><span class="p">,</span> <span class="n">WatermarkEvent</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_watermark</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">new_watermark</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">event</span><span class="p">,</span> <span class="n">ElementEvent</span><span class="p">):</span>
<span class="n">main_output</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">main_output</span><span class="p">)</span>
<span class="k">for</span> <span class="n">tv</span> <span class="ow">in</span> <span class="n">event</span><span class="o">.</span><span class="n">timestamped_values</span><span class="p">:</span>
<span class="c1"># Unreify the value into the correct window.</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">tv</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="n">WindowedValueHolder</span><span class="p">):</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">output</span><span class="p">(</span><span class="n">tv</span><span class="o">.</span><span class="n">value</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">output</span><span class="p">(</span>
<span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="n">tv</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">=</span><span class="n">tv</span><span class="o">.</span><span class="n">timestamp</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bundles</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">bundle</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># The watermark hold we set here is the way we allow the TestStream events</span>
<span class="c1"># to control the output watermark.</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">bundles</span><span class="p">,</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="p">{</span><span class="kc">None</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_watermark</span><span class="p">})</span>
<span class="k">class</span> <span class="nc">_PairWithTimingEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;TransformEvaluator for the PairWithTiming transform.</span>
<span class="sd"> This transform takes an element as an input and outputs</span>
<span class="sd"> KV(element, `TimingInfo`). Where the `TimingInfo` contains both the</span>
<span class="sd"> processing time timestamp and watermark.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">main_output</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">main_output</span><span class="p">)</span>
<span class="n">watermark_manager</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">_watermark_manager</span>
<span class="n">watermarks</span> <span class="o">=</span> <span class="n">watermark_manager</span><span class="o">.</span><span class="n">get_watermarks</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="p">)</span>
<span class="n">output_watermark</span> <span class="o">=</span> <span class="n">watermarks</span><span class="o">.</span><span class="n">output_watermark</span>
<span class="n">now</span> <span class="o">=</span> <span class="n">Timestamp</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">watermark_manager</span><span class="o">.</span><span class="n">_clock</span><span class="o">.</span><span class="n">time</span><span class="p">())</span>
<span class="bp">self</span><span class="o">.</span><span class="n">timing_info</span> <span class="o">=</span> <span class="n">TimingInfo</span><span class="p">(</span><span class="n">now</span><span class="p">,</span> <span class="n">output_watermark</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">WindowedValue</span><span class="p">((</span><span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">timing_info</span><span class="p">),</span>
<span class="n">element</span><span class="o">.</span><span class="n">timestamp</span><span class="p">,</span>
<span class="n">element</span><span class="o">.</span><span class="n">windows</span><span class="p">,</span>
<span class="n">element</span><span class="o">.</span><span class="n">pane_info</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bundle</span><span class="o">.</span><span class="n">output</span><span class="p">(</span><span class="n">result</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">bundle</span><span class="p">],</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="p">{})</span>
<span class="k">class</span> <span class="nc">_TestStreamEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;TransformEvaluator for the TestStream transform.</span>
<span class="sd"> This evaluator&#39;s responsibility is to retrieve the next event from the</span>
<span class="sd"> _TestStream and either: advance the clock, advance the _TestStream watermark,</span>
<span class="sd"> or pass the event to the _WatermarkController.</span>
<span class="sd"> The _WatermarkController is in charge of emitting the elements to the</span>
<span class="sd"> downstream consumers and setting its own output watermark.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">event_stream</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_stream</span> <span class="o">=</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span>
<span class="bp">self</span><span class="o">.</span><span class="n">is_done</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">watermark</span> <span class="o">=</span> <span class="n">MIN_TIMESTAMP</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="c1"># The watermark of the _TestStream transform itself.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">watermark</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">timestamp</span>
<span class="c1"># Set up the correct watermark holds in the Watermark controllers and the</span>
<span class="c1"># TestStream so that the watermarks will not automatically advance to +inf</span>
<span class="c1"># when elements start streaming. This can happen multiple times in the first</span>
<span class="c1"># bundle, but the operations are idempotent and adding state to keep track</span>
<span class="c1"># of this would add unnecessary code complexity.</span>
<span class="n">events</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">watermark</span> <span class="o">==</span> <span class="n">MIN_TIMESTAMP</span><span class="p">:</span>
<span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">test_stream</span><span class="o">.</span><span class="n">_set_up</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">test_stream</span><span class="o">.</span><span class="n">output_tags</span><span class="p">):</span>
<span class="n">events</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">event</span><span class="p">)</span>
<span class="c1"># Retrieve the TestStream&#39;s event stream and read from it.</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">events</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="nb">next</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">event_stream</span><span class="p">))</span>
<span class="k">except</span> <span class="ne">StopIteration</span><span class="p">:</span>
<span class="c1"># Advance the watermarks to +inf to cleanly stop the pipeline.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">is_done</span> <span class="o">=</span> <span class="kc">True</span>
<span class="n">events</span> <span class="o">+=</span> <span class="p">([</span>
<span class="n">e</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">test_stream</span><span class="o">.</span><span class="n">_tear_down</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">test_stream</span><span class="o">.</span><span class="n">output_tags</span><span class="p">)</span>
<span class="p">])</span>
<span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">events</span><span class="p">:</span>
<span class="c1"># We can either have the _TestStream or the _WatermarkController to emit</span>
<span class="c1"># the elements. We chose to emit in the _WatermarkController so that the</span>
<span class="c1"># element is emitted at the correct watermark value.</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">event</span><span class="p">,</span> <span class="p">(</span><span class="n">ElementEvent</span><span class="p">,</span> <span class="n">WatermarkEvent</span><span class="p">)):</span>
<span class="c1"># The WATERMARK_CONTROL_TAG is used to hold the _TestStream&#39;s</span>
<span class="c1"># watermark to -inf, then +inf-1, then +inf. This watermark progression</span>
<span class="c1"># is ultimately used to set up the proper holds to allow the</span>
<span class="c1"># _WatermarkControllers to control their own output watermarks.</span>
<span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">tag</span> <span class="o">==</span> <span class="n">_TestStream</span><span class="o">.</span><span class="n">WATERMARK_CONTROL_TAG</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">watermark</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">new_watermark</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">main_output</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">main_output</span><span class="p">)</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">output</span><span class="p">(</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="n">event</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bundles</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">bundle</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">event</span><span class="p">,</span> <span class="n">ProcessingTimeEvent</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">_watermark_manager</span><span class="o">.</span><span class="n">_clock</span><span class="o">.</span><span class="n">advance_time</span><span class="p">(</span>
<span class="n">event</span><span class="o">.</span><span class="n">advance_by</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Invalid TestStream event: </span><span class="si">%s</span><span class="s1">.&#39;</span> <span class="o">%</span> <span class="n">event</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">unprocessed_bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="c1"># Continue to send its own state to itself via an unprocessed bundle. This</span>
<span class="c1"># acts as a heartbeat, where each element will read the next event from the</span>
<span class="c1"># event stream.</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_done</span><span class="p">:</span>
<span class="n">unprocessed_bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span>
<span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">pipeline</span><span class="p">))</span>
<span class="n">unprocessed_bundle</span><span class="o">.</span><span class="n">add</span><span class="p">(</span>
<span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="sa">b</span><span class="s1">&#39;&#39;</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">watermark</span><span class="p">))</span>
<span class="n">unprocessed_bundles</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">unprocessed_bundle</span><span class="p">)</span>
<span class="c1"># Returning the watermark in the dict here is used as a watermark hold.</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">bundles</span><span class="p">,</span> <span class="n">unprocessed_bundles</span><span class="p">,</span> <span class="kc">None</span><span class="p">,</span> <span class="p">{</span><span class="kc">None</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">watermark</span><span class="p">})</span>
<span class="k">class</span> <span class="nc">_PubSubReadEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;TransformEvaluator for PubSub read.&quot;&quot;&quot;</span>
<span class="c1"># A mapping of transform to _PubSubSubscriptionWrapper.</span>
<span class="c1"># TODO(https://github.com/apache/beam/issues/19751): Prevents garbage</span>
<span class="c1"># collection of pipeline instances.</span>
<span class="n">_subscription_cache</span> <span class="o">=</span> <span class="p">{}</span> <span class="c1"># type: Dict[AppliedPTransform, str]</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">source</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">_source</span> <span class="c1"># type: _PubSubSource</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">id_label</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span>
<span class="s1">&#39;DirectRunner: id_label is not supported for PubSub reads&#39;</span><span class="p">)</span>
<span class="n">sub_project</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="p">,</span> <span class="s1">&#39;pipeline_options&#39;</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="kn">import</span> <span class="n">GoogleCloudOptions</span>
<span class="n">sub_project</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">pipeline_options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span>
<span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">project</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">sub_project</span><span class="p">:</span>
<span class="n">sub_project</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">project</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sub_name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_subscription</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">project</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">topic_name</span><span class="p">,</span>
<span class="n">sub_project</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">subscription_name</span><span class="p">)</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">get_subscription</span><span class="p">(</span>
<span class="bp">cls</span><span class="p">,</span> <span class="n">transform</span><span class="p">,</span> <span class="n">project</span><span class="p">,</span> <span class="n">short_topic_name</span><span class="p">,</span> <span class="n">sub_project</span><span class="p">,</span> <span class="n">short_sub_name</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">google.cloud</span> <span class="kn">import</span> <span class="n">pubsub</span>
<span class="k">if</span> <span class="n">short_sub_name</span><span class="p">:</span>
<span class="k">return</span> <span class="n">pubsub</span><span class="o">.</span><span class="n">SubscriberClient</span><span class="o">.</span><span class="n">subscription_path</span><span class="p">(</span><span class="n">project</span><span class="p">,</span> <span class="n">short_sub_name</span><span class="p">)</span>
<span class="k">if</span> <span class="n">transform</span> <span class="ow">in</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_subscription_cache</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_subscription_cache</span><span class="p">[</span><span class="n">transform</span><span class="p">]</span>
<span class="n">sub_client</span> <span class="o">=</span> <span class="n">pubsub</span><span class="o">.</span><span class="n">SubscriberClient</span><span class="p">()</span>
<span class="n">sub_name</span> <span class="o">=</span> <span class="n">sub_client</span><span class="o">.</span><span class="n">subscription_path</span><span class="p">(</span>
<span class="n">sub_project</span><span class="p">,</span>
<span class="s1">&#39;beam_</span><span class="si">%d</span><span class="s1">_</span><span class="si">%x</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()),</span> <span class="n">random</span><span class="o">.</span><span class="n">randrange</span><span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">32</span><span class="p">)))</span>
<span class="n">topic_name</span> <span class="o">=</span> <span class="n">sub_client</span><span class="o">.</span><span class="n">topic_path</span><span class="p">(</span><span class="n">project</span><span class="p">,</span> <span class="n">short_topic_name</span><span class="p">)</span>
<span class="n">sub_client</span><span class="o">.</span><span class="n">create_subscription</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="n">sub_name</span><span class="p">,</span> <span class="n">topic</span><span class="o">=</span><span class="n">topic_name</span><span class="p">)</span>
<span class="n">atexit</span><span class="o">.</span><span class="n">register</span><span class="p">(</span><span class="n">sub_client</span><span class="o">.</span><span class="n">delete_subscription</span><span class="p">,</span> <span class="n">subscription</span><span class="o">=</span><span class="n">sub_name</span><span class="p">)</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_subscription_cache</span><span class="p">[</span><span class="n">transform</span><span class="p">]</span> <span class="o">=</span> <span class="n">sub_name</span>
<span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_subscription_cache</span><span class="p">[</span><span class="n">transform</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">_read_from_pubsub</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timestamp_attribute</span><span class="p">):</span>
<span class="c1"># type: (...) -&gt; List[Tuple[Timestamp, PubsubMessage]]</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.pubsub</span> <span class="kn">import</span> <span class="n">PubsubMessage</span>
<span class="kn">from</span> <span class="nn">google.cloud</span> <span class="kn">import</span> <span class="n">pubsub</span>
<span class="k">def</span> <span class="nf">_get_element</span><span class="p">(</span><span class="n">message</span><span class="p">):</span>
<span class="n">parsed_message</span> <span class="o">=</span> <span class="n">PubsubMessage</span><span class="o">.</span><span class="n">_from_message</span><span class="p">(</span><span class="n">message</span><span class="p">)</span>
<span class="k">if</span> <span class="p">(</span><span class="n">timestamp_attribute</span> <span class="ow">and</span>
<span class="n">timestamp_attribute</span> <span class="ow">in</span> <span class="n">parsed_message</span><span class="o">.</span><span class="n">attributes</span><span class="p">):</span>
<span class="n">rfc3339_or_milli</span> <span class="o">=</span> <span class="n">parsed_message</span><span class="o">.</span><span class="n">attributes</span><span class="p">[</span><span class="n">timestamp_attribute</span><span class="p">]</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">timestamp</span> <span class="o">=</span> <span class="n">Timestamp</span><span class="p">(</span><span class="n">micros</span><span class="o">=</span><span class="nb">int</span><span class="p">(</span><span class="n">rfc3339_or_milli</span><span class="p">)</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">ValueError</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">timestamp</span> <span class="o">=</span> <span class="n">Timestamp</span><span class="o">.</span><span class="n">from_rfc3339</span><span class="p">(</span><span class="n">rfc3339_or_milli</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">ValueError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Bad timestamp value: </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">e</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">if</span> <span class="n">message</span><span class="o">.</span><span class="n">publish_time</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;No publish time present in message: </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">message</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">timestamp</span> <span class="o">=</span> <span class="n">Timestamp</span><span class="o">.</span><span class="n">from_utc_datetime</span><span class="p">(</span><span class="n">message</span><span class="o">.</span><span class="n">publish_time</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">ValueError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Bad timestamp value for message </span><span class="si">%s</span><span class="s1">: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">message</span><span class="p">,</span> <span class="n">e</span><span class="p">)</span>
<span class="k">return</span> <span class="n">timestamp</span><span class="p">,</span> <span class="n">parsed_message</span>
<span class="c1"># Because of the AutoAck, we are not able to reread messages if this</span>
<span class="c1"># evaluator fails with an exception before emitting a bundle. However,</span>
<span class="c1"># the DirectRunner currently doesn&#39;t retry work items anyway, so the</span>
<span class="c1"># pipeline would enter an inconsistent state on any error.</span>
<span class="n">sub_client</span> <span class="o">=</span> <span class="n">pubsub</span><span class="o">.</span><span class="n">SubscriberClient</span><span class="p">()</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">response</span> <span class="o">=</span> <span class="n">sub_client</span><span class="o">.</span><span class="n">pull</span><span class="p">(</span>
<span class="n">subscription</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_sub_name</span><span class="p">,</span> <span class="n">max_messages</span><span class="o">=</span><span class="mi">10</span><span class="p">,</span> <span class="n">timeout</span><span class="o">=</span><span class="mi">30</span><span class="p">)</span>
<span class="n">results</span> <span class="o">=</span> <span class="p">[</span><span class="n">_get_element</span><span class="p">(</span><span class="n">rm</span><span class="o">.</span><span class="n">message</span><span class="p">)</span> <span class="k">for</span> <span class="n">rm</span> <span class="ow">in</span> <span class="n">response</span><span class="o">.</span><span class="n">received_messages</span><span class="p">]</span>
<span class="n">ack_ids</span> <span class="o">=</span> <span class="p">[</span><span class="n">rm</span><span class="o">.</span><span class="n">ack_id</span> <span class="k">for</span> <span class="n">rm</span> <span class="ow">in</span> <span class="n">response</span><span class="o">.</span><span class="n">received_messages</span><span class="p">]</span>
<span class="k">if</span> <span class="n">ack_ids</span><span class="p">:</span>
<span class="n">sub_client</span><span class="o">.</span><span class="n">acknowledge</span><span class="p">(</span><span class="n">subscription</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_sub_name</span><span class="p">,</span> <span class="n">ack_ids</span><span class="o">=</span><span class="n">ack_ids</span><span class="p">)</span>
<span class="k">finally</span><span class="p">:</span>
<span class="n">sub_client</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="k">return</span> <span class="n">results</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; TransformResult</span>
<span class="n">data</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_from_pubsub</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">timestamp_attribute</span><span class="p">)</span>
<span class="k">if</span> <span class="n">data</span><span class="p">:</span>
<span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="c1"># TODO(ccy): Respect the PubSub source&#39;s id_label field.</span>
<span class="k">for</span> <span class="n">timestamp</span><span class="p">,</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">data</span><span class="p">:</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">with_attributes</span><span class="p">:</span>
<span class="n">element</span> <span class="o">=</span> <span class="n">message</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">element</span> <span class="o">=</span> <span class="n">message</span><span class="o">.</span><span class="n">data</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">output</span><span class="p">(</span>
<span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">=</span><span class="n">timestamp</span><span class="p">))</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[</span><span class="n">bundle</span><span class="p">]</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">inputs</span><span class="p">:</span>
<span class="n">input_pvalue</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">inputs</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">input_pvalue</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span>
<span class="n">unprocessed_bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">input_pvalue</span><span class="p">)</span>
<span class="c1"># TODO(udim): Correct value for watermark hold.</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">bundles</span><span class="p">,</span> <span class="p">[</span><span class="n">unprocessed_bundle</span><span class="p">],</span>
<span class="kc">None</span><span class="p">,</span> <span class="p">{</span><span class="kc">None</span><span class="p">:</span> <span class="n">Timestamp</span><span class="o">.</span><span class="n">of</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">())})</span>
<span class="k">class</span> <span class="nc">_FlattenEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;TransformEvaluator for Flatten transform.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">bundle</span><span class="o">.</span><span class="n">output</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">bundle</span><span class="p">]</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">bundles</span><span class="p">,</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_ImpulseEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;TransformEvaluator for Impulse transform.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">output</span><span class="p">(</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="sa">b</span><span class="s1">&#39;&#39;</span><span class="p">))</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="p">[</span><span class="n">bundle</span><span class="p">],</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_TaggedReceivers</span><span class="p">(</span><span class="nb">dict</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Received ParDo output and redirect to the associated output bundle.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">evaluation_context</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span> <span class="o">=</span> <span class="n">evaluation_context</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_null_receiver</span> <span class="o">=</span> <span class="kc">None</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="k">class</span> <span class="nc">NullReceiver</span><span class="p">(</span><span class="n">common</span><span class="o">.</span><span class="n">Receiver</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Ignores undeclared outputs, default execution mode.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">receive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="c1"># type: (WindowedValue) -&gt; None</span>
<span class="k">pass</span>
<span class="k">class</span> <span class="nc">_InMemoryReceiver</span><span class="p">(</span><span class="n">common</span><span class="o">.</span><span class="n">Receiver</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Buffers undeclared outputs to the given dictionary.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">target</span><span class="p">,</span> <span class="n">tag</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_target</span> <span class="o">=</span> <span class="n">target</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_tag</span> <span class="o">=</span> <span class="n">tag</span>
<span class="k">def</span> <span class="nf">receive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="c1"># type: (WindowedValue) -&gt; None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_target</span><span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">_tag</span><span class="p">]</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="fm">__missing__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_null_receiver</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_null_receiver</span> <span class="o">=</span> <span class="n">_TaggedReceivers</span><span class="o">.</span><span class="n">NullReceiver</span><span class="p">()</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_null_receiver</span>
<span class="k">class</span> <span class="nc">_ParDoEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;TransformEvaluator for ParDo transform.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span>
<span class="n">evaluation_context</span><span class="p">,</span> <span class="c1"># type: EvaluationContext</span>
<span class="n">applied_ptransform</span><span class="p">,</span> <span class="c1"># type: AppliedPTransform</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">,</span>
<span class="n">perform_dofn_pickle_test</span><span class="o">=</span><span class="kc">True</span>
<span class="p">):</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="c1"># This is a workaround for SDF implementation. SDF implementation adds state</span>
<span class="c1"># to the SDF that is not picklable.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_perform_dofn_pickle_test</span> <span class="o">=</span> <span class="n">perform_dofn_pickle_test</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">transform</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_tagged_receivers</span> <span class="o">=</span> <span class="n">_TaggedReceivers</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="p">)</span>
<span class="k">for</span> <span class="n">output_tag</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">outputs</span><span class="p">:</span>
<span class="n">output_pcollection</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">tag</span><span class="o">=</span><span class="n">output_tag</span><span class="p">)</span>
<span class="n">output_pcollection</span><span class="o">.</span><span class="n">producer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_tagged_receivers</span><span class="p">[</span><span class="n">output_tag</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="n">output_pcollection</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_tagged_receivers</span><span class="p">[</span><span class="n">output_tag</span><span class="p">]</span><span class="o">.</span><span class="n">tag</span> <span class="o">=</span> <span class="n">output_tag</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_counter_factory</span> <span class="o">=</span> <span class="n">counters</span><span class="o">.</span><span class="n">CounterFactory</span><span class="p">()</span>
<span class="c1"># TODO(aaltay): Consider storing the serialized form as an optimization.</span>
<span class="n">dofn</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">pickler</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">pickler</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">transform</span><span class="o">.</span><span class="n">dofn</span><span class="p">))</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_perform_dofn_pickle_test</span> <span class="k">else</span> <span class="n">transform</span><span class="o">.</span><span class="n">dofn</span><span class="p">)</span>
<span class="n">args</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">args</span> <span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="s1">&#39;args&#39;</span><span class="p">)</span> <span class="k">else</span> <span class="p">[]</span>
<span class="n">kwargs</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">kwargs</span> <span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="s1">&#39;kwargs&#39;</span><span class="p">)</span> <span class="k">else</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">user_state_context</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">user_timer_map</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">if</span> <span class="n">is_stateful_dofn</span><span class="p">(</span><span class="n">dofn</span><span class="p">):</span>
<span class="n">kv_type_hint</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">inputs</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">element_type</span>
<span class="k">if</span> <span class="n">kv_type_hint</span> <span class="ow">and</span> <span class="n">kv_type_hint</span> <span class="o">!=</span> <span class="n">Any</span><span class="p">:</span>
<span class="n">coder</span> <span class="o">=</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="n">kv_type_hint</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span> <span class="o">=</span> <span class="n">coder</span><span class="o">.</span><span class="n">key_coder</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span> <span class="o">=</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="n">Any</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">user_state_context</span> <span class="o">=</span> <span class="n">DirectUserStateContext</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="p">,</span> <span class="n">dofn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span><span class="p">)</span>
<span class="n">_</span><span class="p">,</span> <span class="n">all_timer_specs</span> <span class="o">=</span> <span class="n">get_dofn_specs</span><span class="p">(</span><span class="n">dofn</span><span class="p">)</span>
<span class="k">for</span> <span class="n">timer_spec</span> <span class="ow">in</span> <span class="n">all_timer_specs</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">user_timer_map</span><span class="p">[</span><span class="s1">&#39;user/</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">timer_spec</span><span class="o">.</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">timer_spec</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span> <span class="o">=</span> <span class="n">DoFnRunner</span><span class="p">(</span>
<span class="n">dofn</span><span class="p">,</span>
<span class="n">args</span><span class="p">,</span>
<span class="n">kwargs</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_side_inputs</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">inputs</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">windowing</span><span class="p">,</span>
<span class="n">tagged_receivers</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_tagged_receivers</span><span class="p">,</span>
<span class="n">step_name</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">full_label</span><span class="p">,</span>
<span class="n">state</span><span class="o">=</span><span class="n">DoFnState</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_counter_factory</span><span class="p">),</span>
<span class="n">user_state_context</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">user_state_context</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">setup</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">process_timer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timer_firing</span><span class="p">):</span>
<span class="k">if</span> <span class="n">timer_firing</span><span class="o">.</span><span class="n">name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">user_timer_map</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s1">&#39;Unknown timer fired: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">timer_firing</span><span class="p">)</span>
<span class="n">timer_spec</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">user_timer_map</span><span class="p">[</span><span class="n">timer_firing</span><span class="o">.</span><span class="n">name</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">process_user_timer</span><span class="p">(</span>
<span class="n">timer_spec</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="n">timer_firing</span><span class="o">.</span><span class="n">encoded_key</span><span class="p">),</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">window</span><span class="p">,</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">timestamp</span><span class="p">,</span>
<span class="c1"># TODO Add paneinfo to timer_firing in DirectRunner</span>
<span class="kc">None</span><span class="p">,</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">dynamic_timer_tag</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">process</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">finish</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">teardown</span><span class="p">()</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_tagged_receivers</span><span class="o">.</span><span class="n">values</span><span class="p">())</span>
<span class="n">result_counters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_counter_factory</span><span class="o">.</span><span class="n">get_counters</span><span class="p">()</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">user_state_context</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">user_state_context</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">user_state_context</span><span class="o">.</span><span class="n">reset</span><span class="p">()</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">bundles</span><span class="p">,</span> <span class="p">[],</span> <span class="n">result_counters</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_GroupByKeyOnlyEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;TransformEvaluator for _GroupByKeyOnly transform.&quot;&quot;&quot;</span>
<span class="n">MAX_ELEMENT_PER_BUNDLE</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">ELEMENTS_TAG</span> <span class="o">=</span> <span class="n">_ListStateTag</span><span class="p">(</span><span class="s1">&#39;elements&#39;</span><span class="p">)</span>
<span class="n">COMPLETION_TAG</span> <span class="o">=</span> <span class="n">_CombiningValueStateTag</span><span class="p">(</span><span class="s1">&#39;completed&#39;</span><span class="p">,</span> <span class="nb">any</span><span class="p">)</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_is_final_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_execution_context</span><span class="o">.</span><span class="n">watermarks</span><span class="o">.</span><span class="n">input_watermark</span> <span class="o">==</span>
<span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">global_state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="bp">self</span><span class="o">.</span><span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="c1"># The output type of a GroupByKey will be Tuple[Any, Any] or more specific.</span>
<span class="c1"># TODO(https://github.com/apache/beam/issues/18490): Infer coders earlier.</span>
<span class="n">kv_type_hint</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">outputs</span><span class="p">[</span><span class="kc">None</span><span class="p">]</span><span class="o">.</span><span class="n">element_type</span> <span class="ow">or</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">get_type_hints</span><span class="p">()</span><span class="o">.</span><span class="n">input_types</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">])</span>
<span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span> <span class="o">=</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="n">kv_type_hint</span><span class="o">.</span><span class="n">tuple_types</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="k">def</span> <span class="nf">process_timer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timer_firing</span><span class="p">):</span>
<span class="c1"># We do not need to emit a KeyedWorkItem to process_element().</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">get_state</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="n">_GroupByKeyOnlyEvaluator</span><span class="o">.</span><span class="n">COMPLETION_TAG</span><span class="p">)</span>
<span class="k">if</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">WindowedValue</span><span class="p">)</span> <span class="ow">and</span>
<span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="n">abc</span><span class="o">.</span><span class="n">Iterable</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">len</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">)</span> <span class="o">==</span> <span class="mi">2</span><span class="p">):</span>
<span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span>
<span class="n">encoded_k</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">k</span><span class="p">)</span>
<span class="n">state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="n">encoded_k</span><span class="p">)</span>
<span class="n">state</span><span class="o">.</span><span class="n">add_state</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">_GroupByKeyOnlyEvaluator</span><span class="o">.</span><span class="n">ELEMENTS_TAG</span><span class="p">,</span> <span class="n">v</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">TypeCheckError</span><span class="p">(</span>
<span class="s1">&#39;Input to _GroupByKeyOnly must be a PCollection of &#39;</span>
<span class="s1">&#39;windowed key-value pairs. Instead received: </span><span class="si">%r</span><span class="s1">.&#39;</span> <span class="o">%</span> <span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_is_final_bundle</span><span class="p">():</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">get_state</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span>
<span class="n">_GroupByKeyOnlyEvaluator</span><span class="o">.</span><span class="n">COMPLETION_TAG</span><span class="p">):</span>
<span class="c1"># Ignore empty bundles after emitting output. (This may happen because</span>
<span class="c1"># empty bundles do not affect input watermarks.)</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">gbk_result</span> <span class="o">=</span> <span class="p">[]</span>
<span class="c1"># TODO(ccy): perhaps we can clean this up to not use this</span>
<span class="c1"># internal attribute of the DirectStepContext.</span>
<span class="k">for</span> <span class="n">encoded_k</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">existing_keyed_state</span><span class="p">:</span>
<span class="c1"># Ignore global state.</span>
<span class="k">if</span> <span class="n">encoded_k</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">continue</span>
<span class="n">k</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="n">encoded_k</span><span class="p">)</span>
<span class="n">state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="n">encoded_k</span><span class="p">)</span>
<span class="n">vs</span> <span class="o">=</span> <span class="n">state</span><span class="o">.</span><span class="n">get_state</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">_GroupByKeyOnlyEvaluator</span><span class="o">.</span><span class="n">ELEMENTS_TAG</span><span class="p">)</span>
<span class="n">gbk_result</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">((</span><span class="n">k</span><span class="p">,</span> <span class="n">vs</span><span class="p">)))</span>
<span class="k">def</span> <span class="nf">len_element_fn</span><span class="p">(</span><span class="n">element</span><span class="p">):</span>
<span class="n">_</span><span class="p">,</span> <span class="n">v</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span>
<span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="n">v</span><span class="p">)</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_split_list_into_bundles</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">output_pcollection</span><span class="p">,</span>
<span class="n">gbk_result</span><span class="p">,</span>
<span class="n">_GroupByKeyOnlyEvaluator</span><span class="o">.</span><span class="n">MAX_ELEMENT_PER_BUNDLE</span><span class="p">,</span>
<span class="n">len_element_fn</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">add_state</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="n">_GroupByKeyOnlyEvaluator</span><span class="o">.</span><span class="n">COMPLETION_TAG</span><span class="p">,</span> <span class="kc">True</span><span class="p">)</span>
<span class="n">hold</span> <span class="o">=</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">hold</span> <span class="o">=</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_NEG_INF</span>
<span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">set_timer</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">,</span> <span class="n">TimeDomain</span><span class="o">.</span><span class="n">WATERMARK</span><span class="p">,</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span><span class="p">)</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">bundles</span><span class="p">,</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="p">{</span><span class="kc">None</span><span class="p">:</span> <span class="n">hold</span><span class="p">})</span>
<span class="k">class</span> <span class="nc">_StreamingGroupByKeyOnlyEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;TransformEvaluator for _StreamingGroupByKeyOnly transform.</span>
<span class="sd"> The _GroupByKeyOnlyEvaluator buffers elements until its input watermark goes</span>
<span class="sd"> to infinity, which is suitable for batch mode execution. During streaming</span>
<span class="sd"> mode execution, we emit each bundle as it comes to the next transform.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">MAX_ELEMENT_PER_BUNDLE</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gbk_items</span> <span class="o">=</span> <span class="n">collections</span><span class="o">.</span><span class="n">defaultdict</span><span class="p">(</span><span class="nb">list</span><span class="p">)</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="bp">self</span><span class="o">.</span><span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="c1"># The input type of a GroupByKey will be Tuple[Any, Any] or more specific.</span>
<span class="n">kv_type_hint</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">inputs</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">element_type</span>
<span class="n">key_type_hint</span> <span class="o">=</span> <span class="p">(</span><span class="n">kv_type_hint</span><span class="o">.</span><span class="n">tuple_types</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="k">if</span> <span class="n">kv_type_hint</span> <span class="k">else</span> <span class="n">Any</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span> <span class="o">=</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="n">key_type_hint</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">if</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">WindowedValue</span><span class="p">)</span> <span class="ow">and</span>
<span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="n">collections</span><span class="o">.</span><span class="n">abc</span><span class="o">.</span><span class="n">Iterable</span><span class="p">)</span> <span class="ow">and</span>
<span class="nb">len</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">)</span> <span class="o">==</span> <span class="mi">2</span><span class="p">):</span>
<span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gbk_items</span><span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="n">k</span><span class="p">)]</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">v</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">TypeCheckError</span><span class="p">(</span>
<span class="s1">&#39;Input to _GroupByKeyOnly must be a PCollection of &#39;</span>
<span class="s1">&#39;windowed key-value pairs. Instead received: </span><span class="si">%r</span><span class="s1">.&#39;</span> <span class="o">%</span> <span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">for</span> <span class="n">encoded_k</span><span class="p">,</span> <span class="n">vs</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">gbk_items</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">bundle</span><span class="p">:</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="n">bundles</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">bundle</span><span class="p">)</span>
<span class="n">kwi</span> <span class="o">=</span> <span class="n">KeyedWorkItem</span><span class="p">(</span><span class="n">encoded_k</span><span class="p">,</span> <span class="n">elements</span><span class="o">=</span><span class="n">vs</span><span class="p">)</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">GlobalWindows</span><span class="o">.</span><span class="n">windowed_value</span><span class="p">(</span><span class="n">kwi</span><span class="p">))</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">bundles</span><span class="p">,</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_StreamingGroupAlsoByWindowEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;TransformEvaluator for the _StreamingGroupAlsoByWindow transform.</span>
<span class="sd"> This evaluator is only used in streaming mode. In batch mode, the</span>
<span class="sd"> GroupAlsoByWindow operation is evaluated as a normal DoFn, as defined</span>
<span class="sd"> in transforms/core.py.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="bp">self</span><span class="o">.</span><span class="n">output_pcollection</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_outputs</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">driver</span> <span class="o">=</span> <span class="n">create_trigger_driver</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">windowing</span><span class="p">,</span>
<span class="n">clock</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">_watermark_manager</span><span class="o">.</span><span class="n">_clock</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gabw_items</span> <span class="o">=</span> <span class="p">[]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">keyed_holds</span> <span class="o">=</span> <span class="p">{}</span>
<span class="c1"># The input type (which is the same as the output type) of a</span>
<span class="c1"># GroupAlsoByWindow will be Tuple[Any, Iter[Any]] or more specific.</span>
<span class="n">kv_type_hint</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="o">.</span><span class="n">outputs</span><span class="p">[</span><span class="kc">None</span><span class="p">]</span><span class="o">.</span><span class="n">element_type</span>
<span class="n">key_type_hint</span> <span class="o">=</span> <span class="p">(</span><span class="n">kv_type_hint</span><span class="o">.</span><span class="n">tuple_types</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="k">if</span> <span class="n">kv_type_hint</span> <span class="k">else</span> <span class="n">Any</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span> <span class="o">=</span> <span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">get_coder</span><span class="p">(</span><span class="n">key_type_hint</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="n">kwi</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">kwi</span><span class="p">,</span> <span class="n">KeyedWorkItem</span><span class="p">),</span> <span class="n">kwi</span>
<span class="n">encoded_k</span><span class="p">,</span> <span class="n">timer_firings</span><span class="p">,</span> <span class="n">vs</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">kwi</span><span class="o">.</span><span class="n">encoded_key</span><span class="p">,</span> <span class="n">kwi</span><span class="o">.</span><span class="n">timer_firings</span><span class="p">,</span> <span class="n">kwi</span><span class="o">.</span><span class="n">elements</span><span class="p">)</span>
<span class="n">k</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">key_coder</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="n">encoded_k</span><span class="p">)</span>
<span class="n">state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="n">encoded_k</span><span class="p">)</span>
<span class="n">watermarks</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">_watermark_manager</span><span class="o">.</span><span class="n">get_watermarks</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_applied_ptransform</span><span class="p">)</span>
<span class="k">for</span> <span class="n">timer_firing</span> <span class="ow">in</span> <span class="n">timer_firings</span><span class="p">:</span>
<span class="k">for</span> <span class="n">wvalue</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">driver</span><span class="o">.</span><span class="n">process_timer</span><span class="p">(</span><span class="n">timer_firing</span><span class="o">.</span><span class="n">window</span><span class="p">,</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">name</span><span class="p">,</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">time_domain</span><span class="p">,</span>
<span class="n">timer_firing</span><span class="o">.</span><span class="n">timestamp</span><span class="p">,</span>
<span class="n">state</span><span class="p">,</span>
<span class="n">watermarks</span><span class="o">.</span><span class="n">input_watermark</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gabw_items</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">wvalue</span><span class="o">.</span><span class="n">with_value</span><span class="p">((</span><span class="n">k</span><span class="p">,</span> <span class="n">wvalue</span><span class="o">.</span><span class="n">value</span><span class="p">)))</span>
<span class="k">if</span> <span class="n">vs</span><span class="p">:</span>
<span class="k">for</span> <span class="n">wvalue</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">driver</span><span class="o">.</span><span class="n">process_elements</span><span class="p">(</span><span class="n">state</span><span class="p">,</span>
<span class="n">vs</span><span class="p">,</span>
<span class="n">watermarks</span><span class="o">.</span><span class="n">output_watermark</span><span class="p">,</span>
<span class="n">watermarks</span><span class="o">.</span><span class="n">input_watermark</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gabw_items</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">wvalue</span><span class="o">.</span><span class="n">with_value</span><span class="p">((</span><span class="n">k</span><span class="p">,</span> <span class="n">wvalue</span><span class="o">.</span><span class="n">value</span><span class="p">)))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">keyed_holds</span><span class="p">[</span><span class="n">encoded_k</span><span class="p">]</span> <span class="o">=</span> <span class="n">state</span><span class="o">.</span><span class="n">get_earliest_hold</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">bundles</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">gabw_items</span><span class="p">:</span>
<span class="n">bundle</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">create_bundle</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">output_pcollection</span><span class="p">)</span>
<span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">gabw_items</span><span class="p">:</span>
<span class="n">bundle</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">item</span><span class="p">)</span>
<span class="n">bundles</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">bundle</span><span class="p">)</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">bundles</span><span class="p">,</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">keyed_holds</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_NativeWriteEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;TransformEvaluator for _NativeWrite transform.&quot;&quot;&quot;</span>
<span class="n">ELEMENTS_TAG</span> <span class="o">=</span> <span class="n">_ListStateTag</span><span class="p">(</span><span class="s1">&#39;elements&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">):</span>
<span class="k">assert</span> <span class="ow">not</span> <span class="n">side_inputs</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="k">assert</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">sink</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sink</span> <span class="o">=</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">sink</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">_is_final_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_execution_context</span><span class="o">.</span><span class="n">watermarks</span><span class="o">.</span><span class="n">input_watermark</span> <span class="o">==</span>
<span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span><span class="p">)</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">_has_already_produced_output</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_execution_context</span><span class="o">.</span><span class="n">watermarks</span><span class="o">.</span><span class="n">output_watermark</span> <span class="o">==</span>
<span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">global_state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process_timer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timer_firing</span><span class="p">):</span>
<span class="c1"># We do not need to emit a KeyedWorkItem to process_element().</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">add_state</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="n">_NativeWriteEvaluator</span><span class="o">.</span><span class="n">ELEMENTS_TAG</span><span class="p">,</span> <span class="n">element</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># finish_bundle will append incoming bundles in memory until all the bundles</span>
<span class="c1"># carrying data is processed. This is done to produce only a single output</span>
<span class="c1"># shard (some tests depends on this behavior). It is possible to have</span>
<span class="c1"># incoming empty bundles after the output is produced, these bundles will be</span>
<span class="c1"># ignored and would not generate additional output files.</span>
<span class="c1"># TODO(altay): Do not wait until the last bundle to write in a single shard.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_is_final_bundle</span><span class="p">:</span>
<span class="n">elements</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">get_state</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="n">_NativeWriteEvaluator</span><span class="o">.</span><span class="n">ELEMENTS_TAG</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_has_already_produced_output</span><span class="p">:</span>
<span class="c1"># Ignore empty bundles that arrive after the output is produced.</span>
<span class="k">assert</span> <span class="n">elements</span> <span class="o">==</span> <span class="p">[]</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sink</span><span class="o">.</span><span class="n">pipeline_options</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_evaluation_context</span><span class="o">.</span><span class="n">pipeline_options</span>
<span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sink</span><span class="o">.</span><span class="n">writer</span><span class="p">()</span> <span class="k">as</span> <span class="n">writer</span><span class="p">:</span>
<span class="k">for</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">elements</span><span class="p">:</span>
<span class="n">writer</span><span class="o">.</span><span class="n">Write</span><span class="p">(</span><span class="n">v</span><span class="o">.</span><span class="n">value</span><span class="p">)</span>
<span class="n">hold</span> <span class="o">=</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">hold</span> <span class="o">=</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_NEG_INF</span>
<span class="bp">self</span><span class="o">.</span><span class="n">global_state</span><span class="o">.</span><span class="n">set_timer</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">,</span> <span class="n">TimeDomain</span><span class="o">.</span><span class="n">WATERMARK</span><span class="p">,</span> <span class="n">WatermarkManager</span><span class="o">.</span><span class="n">WATERMARK_POS_INF</span><span class="p">)</span>
<span class="k">return</span> <span class="n">TransformResult</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="p">[],</span> <span class="p">[],</span> <span class="kc">None</span><span class="p">,</span> <span class="p">{</span><span class="kc">None</span><span class="p">:</span> <span class="n">hold</span><span class="p">})</span>
<span class="k">class</span> <span class="nc">_ProcessElementsEvaluator</span><span class="p">(</span><span class="n">_TransformEvaluator</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;An evaluator for sdf_direct_runner.ProcessElements transform.&quot;&quot;&quot;</span>
<span class="c1"># Maximum number of elements that will be produced by a Splittable DoFn before</span>
<span class="c1"># a checkpoint is requested by the runner.</span>
<span class="n">DEFAULT_MAX_NUM_OUTPUTS</span> <span class="o">=</span> <span class="kc">None</span>
<span class="c1"># Maximum duration a Splittable DoFn will process an element before a</span>
<span class="c1"># checkpoint is requested by the runner.</span>
<span class="n">DEFAULT_MAX_DURATION</span> <span class="o">=</span> <span class="mi">1</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">):</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">)</span>
<span class="n">process_elements_transform</span> <span class="o">=</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">process_elements_transform</span><span class="p">,</span> <span class="n">ProcessElements</span><span class="p">)</span>
<span class="c1"># Replacing the do_fn of the transform with a wrapper do_fn that performs</span>
<span class="c1"># SDF magic.</span>
<span class="n">transform</span> <span class="o">=</span> <span class="n">applied_ptransform</span><span class="o">.</span><span class="n">transform</span>
<span class="n">sdf</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">sdf</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_process_fn</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">new_process_fn</span><span class="p">(</span><span class="n">sdf</span><span class="p">)</span>
<span class="n">transform</span><span class="o">.</span><span class="n">dofn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_process_fn</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_process_fn</span><span class="p">,</span> <span class="n">ProcessFn</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_process_fn</span><span class="o">.</span><span class="n">step_context</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span>
<span class="n">process_element_invoker</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">SDFProcessElementInvoker</span><span class="p">(</span>
<span class="n">max_num_outputs</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">DEFAULT_MAX_NUM_OUTPUTS</span><span class="p">,</span>
<span class="n">max_duration</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">DEFAULT_MAX_DURATION</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_process_fn</span><span class="o">.</span><span class="n">set_process_element_invoker</span><span class="p">(</span><span class="n">process_element_invoker</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_par_do_evaluator</span> <span class="o">=</span> <span class="n">_ParDoEvaluator</span><span class="p">(</span>
<span class="n">evaluation_context</span><span class="p">,</span>
<span class="n">applied_ptransform</span><span class="p">,</span>
<span class="n">input_committed_bundle</span><span class="p">,</span>
<span class="n">side_inputs</span><span class="p">,</span>
<span class="n">perform_dofn_pickle_test</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">keyed_holds</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_par_do_evaluator</span><span class="o">.</span><span class="n">start_bundle</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">process_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="p">,</span> <span class="n">WindowedValue</span><span class="p">)</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">windows</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="n">window</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">windows</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="n">KeyedWorkItem</span><span class="p">):</span>
<span class="n">key</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="o">.</span><span class="n">encoded_key</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># If not a `KeyedWorkItem`, this must be a tuple where key is a randomly</span>
<span class="c1"># generated key and the value is a `WindowedValue` that contains an</span>
<span class="c1"># `ElementAndRestriction` object.</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">)</span>
<span class="n">key</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">value</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_par_do_evaluator</span><span class="o">.</span><span class="n">process_element</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="n">state</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_step_context</span><span class="o">.</span><span class="n">get_keyed_state</span><span class="p">(</span><span class="n">key</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">keyed_holds</span><span class="p">[</span><span class="n">key</span><span class="p">]</span> <span class="o">=</span> <span class="n">state</span><span class="o">.</span><span class="n">get_state</span><span class="p">(</span>
<span class="n">window</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_process_fn</span><span class="o">.</span><span class="n">watermark_hold_tag</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">par_do_result</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_par_do_evaluator</span><span class="o">.</span><span class="n">finish_bundle</span><span class="p">()</span>
<span class="n">transform_result</span> <span class="o">=</span> <span class="n">TransformResult</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">par_do_result</span><span class="o">.</span><span class="n">uncommitted_output_bundles</span><span class="p">,</span>
<span class="n">par_do_result</span><span class="o">.</span><span class="n">unprocessed_bundles</span><span class="p">,</span>
<span class="n">par_do_result</span><span class="o">.</span><span class="n">counters</span><span class="p">,</span>
<span class="n">par_do_result</span><span class="o">.</span><span class="n">keyed_watermark_holds</span><span class="p">,</span>
<span class="n">par_do_result</span><span class="o">.</span><span class="n">undeclared_tag_values</span><span class="p">)</span>
<span class="k">for</span> <span class="n">key</span><span class="p">,</span> <span class="n">keyed_hold</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">keyed_holds</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="n">transform_result</span><span class="o">.</span><span class="n">keyed_watermark_holds</span><span class="p">[</span><span class="n">key</span><span class="p">]</span> <span class="o">=</span> <span class="n">keyed_hold</span>
<span class="k">return</span> <span class="n">transform_result</span>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>