blob: df755d035bfc27eee5a064df15beb843eb141b33 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.pipeline &mdash; Apache Beam documentation</title>
<link rel="stylesheet" href="../../_static/css/theme.css" type="text/css" />
<link rel="index" title="Index"
href="../../genindex.html"/>
<link rel="search" title="Search" href="../../search.html"/>
<link rel="top" title="Apache Beam documentation" href="../../index.html"/>
<link rel="up" title="Module code" href="../index.html"/>
<script src="../../_static/js/modernizr.min.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search">
<a href="../../index.html" class="icon icon-home"> Apache Beam
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.internal.html">apache_beam.internal package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.tools.html">apache_beam.tools package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.utils.html">apache_beam.utils package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../apache_beam.version.html">apache_beam.version module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../index.html">Docs</a> &raquo;</li>
<li><a href="../index.html">Module code</a> &raquo;</li>
<li>apache_beam.pipeline</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.pipeline</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Pipeline, the top-level Beam object.</span>
<span class="sd">A pipeline holds a DAG of data transforms. Conceptually the nodes of the DAG</span>
<span class="sd">are transforms (:class:`~apache_beam.transforms.ptransform.PTransform` objects)</span>
<span class="sd">and the edges are values (mostly :class:`~apache_beam.pvalue.PCollection`</span>
<span class="sd">objects). The transforms take as inputs one or more PValues and output one or</span>
<span class="sd">more :class:`~apache_beam.pvalue.PValue` s.</span>
<span class="sd">The pipeline offers functionality to traverse the graph. The actual operation</span>
<span class="sd">to be executed for each node visited is specified through a runner object.</span>
<span class="sd">Typical usage::</span>
<span class="sd"> # Create a pipeline object using a local runner for execution.</span>
<span class="sd"> with beam.Pipeline(&#39;DirectRunner&#39;) as p:</span>
<span class="sd"> # Add to the pipeline a &quot;Create&quot; transform. When executed this</span>
<span class="sd"> # transform will produce a PCollection object with the specified values.</span>
<span class="sd"> pcoll = p | &#39;Create&#39; &gt;&gt; beam.Create([1, 2, 3])</span>
<span class="sd"> # Another transform could be applied to pcoll, e.g., writing to a text file.</span>
<span class="sd"> # For other transforms, refer to transforms/ directory.</span>
<span class="sd"> pcoll | &#39;Write&#39; &gt;&gt; beam.io.WriteToText(&#39;./output&#39;)</span>
<span class="sd"> # run() will execute the DAG stored in the pipeline. The execution of the</span>
<span class="sd"> # nodes visited is done using the specified local runner.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">absolute_import</span>
<span class="kn">import</span> <span class="nn">abc</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">os</span>
<span class="kn">import</span> <span class="nn">re</span>
<span class="kn">import</span> <span class="nn">shutil</span>
<span class="kn">import</span> <span class="nn">tempfile</span>
<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">object</span>
<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">zip</span>
<span class="kn">from</span> <span class="nn">future.utils</span> <span class="k">import</span> <span class="n">with_metaclass</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="k">import</span> <span class="n">pvalue</span>
<span class="kn">from</span> <span class="nn">apache_beam.internal</span> <span class="k">import</span> <span class="n">pickler</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.filesystems</span> <span class="k">import</span> <span class="n">FileSystems</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="k">import</span> <span class="n">DebugOptions</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="k">import</span> <span class="n">PipelineOptions</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="k">import</span> <span class="n">SetupOptions</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="k">import</span> <span class="n">StandardOptions</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="k">import</span> <span class="n">TypeOptions</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options_validator</span> <span class="k">import</span> <span class="n">PipelineOptionsValidator</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability</span> <span class="k">import</span> <span class="n">common_urns</span>
<span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="k">import</span> <span class="n">PCollection</span>
<span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="k">import</span> <span class="n">PDone</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners</span> <span class="k">import</span> <span class="n">PipelineRunner</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners</span> <span class="k">import</span> <span class="n">create_runner</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="k">import</span> <span class="n">ptransform</span>
<span class="c1">#from apache_beam.transforms import external</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="k">import</span> <span class="n">TypeCheckError</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="k">import</span> <span class="n">typehints</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils.annotations</span> <span class="k">import</span> <span class="n">deprecated</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;Pipeline&#39;</span><span class="p">,</span> <span class="s1">&#39;PTransformOverride&#39;</span><span class="p">]</span>
<div class="viewcode-block" id="Pipeline"><a class="viewcode-back" href="../../apache_beam.pipeline.html#apache_beam.pipeline.Pipeline">[docs]</a><span class="k">class</span> <span class="nc">Pipeline</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A pipeline object that manages a DAG of</span>
<span class="sd"> :class:`~apache_beam.pvalue.PValue` s and their</span>
<span class="sd"> :class:`~apache_beam.transforms.ptransform.PTransform` s.</span>
<span class="sd"> Conceptually the :class:`~apache_beam.pvalue.PValue` s are the DAG&#39;s nodes and</span>
<span class="sd"> the :class:`~apache_beam.transforms.ptransform.PTransform` s computing</span>
<span class="sd"> the :class:`~apache_beam.pvalue.PValue` s are the edges.</span>
<span class="sd"> All the transforms applied to the pipeline must have distinct full labels.</span>
<span class="sd"> If same transform instance needs to be applied then the right shift operator</span>
<span class="sd"> should be used to designate new names</span>
<span class="sd"> (e.g. ``input | &quot;label&quot; &gt;&gt; my_tranform``).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">runner</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">options</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">argv</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initialize a pipeline object.</span>
<span class="sd"> Args:</span>
<span class="sd"> runner (~apache_beam.runners.runner.PipelineRunner): An object of</span>
<span class="sd"> type :class:`~apache_beam.runners.runner.PipelineRunner` that will be</span>
<span class="sd"> used to execute the pipeline. For registered runners, the runner name</span>
<span class="sd"> can be specified, otherwise a runner object must be supplied.</span>
<span class="sd"> options (~apache_beam.options.pipeline_options.PipelineOptions):</span>
<span class="sd"> A configured</span>
<span class="sd"> :class:`~apache_beam.options.pipeline_options.PipelineOptions` object</span>
<span class="sd"> containing arguments that should be used for running the Beam job.</span>
<span class="sd"> argv (List[str]): a list of arguments (such as :data:`sys.argv`)</span>
<span class="sd"> to be used for building a</span>
<span class="sd"> :class:`~apache_beam.options.pipeline_options.PipelineOptions` object.</span>
<span class="sd"> This will only be used if argument **options** is :data:`None`.</span>
<span class="sd"> Raises:</span>
<span class="sd"> ~exceptions.ValueError: if either the runner or options argument is not</span>
<span class="sd"> of the expected type.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">options</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">options</span><span class="p">,</span> <span class="n">PipelineOptions</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_options</span> <span class="o">=</span> <span class="n">options</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;Parameter options, if specified, must be of type PipelineOptions. &#39;</span>
<span class="s1">&#39;Received : </span><span class="si">%r</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">options</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">argv</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">argv</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_options</span> <span class="o">=</span> <span class="n">PipelineOptions</span><span class="p">(</span><span class="n">argv</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;Parameter argv, if specified, must be a list. Received : </span><span class="si">%r</span><span class="s1">&#39;</span>
<span class="o">%</span> <span class="n">argv</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_options</span> <span class="o">=</span> <span class="n">PipelineOptions</span><span class="p">([])</span>
<span class="n">FileSystems</span><span class="o">.</span><span class="n">set_options</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="p">)</span>
<span class="k">if</span> <span class="n">runner</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">runner</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">StandardOptions</span><span class="p">)</span><span class="o">.</span><span class="n">runner</span>
<span class="k">if</span> <span class="n">runner</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">runner</span> <span class="o">=</span> <span class="n">StandardOptions</span><span class="o">.</span><span class="n">DEFAULT_RUNNER</span>
<span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">((</span><span class="s1">&#39;Missing pipeline option (runner). Executing pipeline &#39;</span>
<span class="s1">&#39;using the default runner: </span><span class="si">%s</span><span class="s1">.&#39;</span><span class="p">),</span> <span class="n">runner</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">runner</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="n">runner</span> <span class="o">=</span> <span class="n">create_runner</span><span class="p">(</span><span class="n">runner</span><span class="p">)</span>
<span class="k">elif</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">runner</span><span class="p">,</span> <span class="n">PipelineRunner</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">&#39;Runner must be a PipelineRunner object or the &#39;</span>
<span class="s1">&#39;name of a registered runner.&#39;</span><span class="p">)</span>
<span class="c1"># Validate pipeline options</span>
<span class="n">errors</span> <span class="o">=</span> <span class="n">PipelineOptionsValidator</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="p">,</span> <span class="n">runner</span><span class="p">)</span><span class="o">.</span><span class="n">validate</span><span class="p">()</span>
<span class="k">if</span> <span class="n">errors</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;Pipeline has validations errors: </span><span class="se">\n</span><span class="s1">&#39;</span> <span class="o">+</span> <span class="s1">&#39;</span><span class="se">\n</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">errors</span><span class="p">))</span>
<span class="c1"># set default experiments for portable runner</span>
<span class="c1"># (needs to occur prior to pipeline construction)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">StandardOptions</span><span class="p">)</span><span class="o">.</span><span class="n">runner</span> <span class="o">==</span> <span class="s1">&#39;PortableRunner&#39;</span><span class="p">:</span>
<span class="n">experiments</span> <span class="o">=</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">DebugOptions</span><span class="p">)</span><span class="o">.</span><span class="n">experiments</span> <span class="ow">or</span> <span class="p">[])</span>
<span class="k">if</span> <span class="ow">not</span> <span class="s1">&#39;beam_fn_api&#39;</span> <span class="ow">in</span> <span class="n">experiments</span><span class="p">:</span>
<span class="n">experiments</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;beam_fn_api&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">DebugOptions</span><span class="p">)</span><span class="o">.</span><span class="n">experiments</span> <span class="o">=</span> <span class="n">experiments</span>
<span class="c1"># Default runner to be used.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span> <span class="o">=</span> <span class="n">runner</span>
<span class="c1"># Stack of transforms generated by nested apply() calls. The stack will</span>
<span class="c1"># contain a root node as an enclosing (parent) node for top transforms.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">transforms_stack</span> <span class="o">=</span> <span class="p">[</span><span class="n">AppliedPTransform</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)]</span>
<span class="c1"># Set of transform labels (full labels) applied to the pipeline.</span>
<span class="c1"># If a transform is applied and the full label is already in the set</span>
<span class="c1"># then the transform will have to be cloned with a new label.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">applied_labels</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="nd">@property</span>
<span class="nd">@deprecated</span><span class="p">(</span><span class="n">since</span><span class="o">=</span><span class="s1">&#39;First stable release&#39;</span><span class="p">,</span>
<span class="n">extra_message</span><span class="o">=</span><span class="s1">&#39;References to &lt;pipeline&gt;.options&#39;</span>
<span class="s1">&#39; will not be supported&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">options</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_options</span>
<span class="k">def</span> <span class="nf">_current_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns the transform currently on the top of the stack.&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transforms_stack</span><span class="p">[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">_root_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns the root transform of the transform stack.&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transforms_stack</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">_remove_labels_recursively</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">applied_transform</span><span class="p">):</span>
<span class="k">for</span> <span class="n">part</span> <span class="ow">in</span> <span class="n">applied_transform</span><span class="o">.</span><span class="n">parts</span><span class="p">:</span>
<span class="k">if</span> <span class="n">part</span><span class="o">.</span><span class="n">full_label</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">applied_labels</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">applied_labels</span><span class="o">.</span><span class="n">remove</span><span class="p">(</span><span class="n">part</span><span class="o">.</span><span class="n">full_label</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_remove_labels_recursively</span><span class="p">(</span><span class="n">part</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_replace</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">override</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">override</span><span class="p">,</span> <span class="n">PTransformOverride</span><span class="p">)</span>
<span class="n">output_map</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">output_replacements</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">input_replacements</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">side_input_replacements</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">class</span> <span class="nc">TransformUpdater</span><span class="p">(</span><span class="n">PipelineVisitor</span><span class="p">):</span> <span class="c1"># pylint: disable=used-before-assignment</span>
<span class="sd">&quot;&quot;&quot;&quot;A visitor that replaces the matching PTransforms.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pipeline</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">=</span> <span class="n">pipeline</span>
<span class="k">def</span> <span class="nf">_replace_if_needed</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">original_transform_node</span><span class="p">):</span>
<span class="k">if</span> <span class="n">override</span><span class="o">.</span><span class="n">matches</span><span class="p">(</span><span class="n">original_transform_node</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">original_transform_node</span><span class="p">,</span> <span class="n">AppliedPTransform</span><span class="p">)</span>
<span class="n">replacement_transform</span> <span class="o">=</span> <span class="n">override</span><span class="o">.</span><span class="n">get_replacement_transform</span><span class="p">(</span>
<span class="n">original_transform_node</span><span class="o">.</span><span class="n">transform</span><span class="p">)</span>
<span class="k">if</span> <span class="n">replacement_transform</span> <span class="ow">is</span> <span class="n">original_transform_node</span><span class="o">.</span><span class="n">transform</span><span class="p">:</span>
<span class="k">return</span>
<span class="n">replacement_transform_node</span> <span class="o">=</span> <span class="n">AppliedPTransform</span><span class="p">(</span>
<span class="n">original_transform_node</span><span class="o">.</span><span class="n">parent</span><span class="p">,</span> <span class="n">replacement_transform</span><span class="p">,</span>
<span class="n">original_transform_node</span><span class="o">.</span><span class="n">full_label</span><span class="p">,</span>
<span class="n">original_transform_node</span><span class="o">.</span><span class="n">inputs</span><span class="p">)</span>
<span class="c1"># Transform execution could depend on order in which nodes are</span>
<span class="c1"># considered. Hence we insert the replacement transform node to same</span>
<span class="c1"># index as the original transform node. Note that this operation</span>
<span class="c1"># removes the original transform node.</span>
<span class="k">if</span> <span class="n">original_transform_node</span><span class="o">.</span><span class="n">parent</span><span class="p">:</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">original_transform_node</span><span class="o">.</span><span class="n">parent</span><span class="p">,</span> <span class="n">AppliedPTransform</span><span class="p">)</span>
<span class="n">parent_parts</span> <span class="o">=</span> <span class="n">original_transform_node</span><span class="o">.</span><span class="n">parent</span><span class="o">.</span><span class="n">parts</span>
<span class="n">parent_parts</span><span class="p">[</span><span class="n">parent_parts</span><span class="o">.</span><span class="n">index</span><span class="p">(</span><span class="n">original_transform_node</span><span class="p">)]</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">replacement_transform_node</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># Original transform has to be a root.</span>
<span class="n">roots</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">transforms_stack</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">parts</span>
<span class="k">assert</span> <span class="n">original_transform_node</span> <span class="ow">in</span> <span class="n">roots</span>
<span class="n">roots</span><span class="p">[</span><span class="n">roots</span><span class="o">.</span><span class="n">index</span><span class="p">(</span><span class="n">original_transform_node</span><span class="p">)]</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">replacement_transform_node</span><span class="p">)</span>
<span class="n">inputs</span> <span class="o">=</span> <span class="n">replacement_transform_node</span><span class="o">.</span><span class="n">inputs</span>
<span class="c1"># TODO: Support replacing PTransforms with multiple inputs.</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">inputs</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">1</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span>
<span class="s1">&#39;PTransform overriding is only supported for PTransforms that &#39;</span>
<span class="s1">&#39;have a single input. Tried to replace input of &#39;</span>
<span class="s1">&#39;AppliedPTransform </span><span class="si">%r</span><span class="s1"> that has </span><span class="si">%d</span><span class="s1"> inputs&#39;</span>
<span class="o">%</span> <span class="n">original_transform_node</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">inputs</span><span class="p">))</span>
<span class="k">elif</span> <span class="nb">len</span><span class="p">(</span><span class="n">inputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="n">input_node</span> <span class="o">=</span> <span class="n">inputs</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">elif</span> <span class="nb">len</span><span class="p">(</span><span class="n">inputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="n">input_node</span> <span class="o">=</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="c1"># We have to add the new AppliedTransform to the stack before expand()</span>
<span class="c1"># and pop it out later to make sure that parts get added correctly.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">transforms_stack</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">replacement_transform_node</span><span class="p">)</span>
<span class="c1"># Keeping the same label for the replaced node but recursively</span>
<span class="c1"># removing labels of child transforms of original transform since they</span>
<span class="c1"># will be replaced during the expand below. This is needed in case</span>
<span class="c1"># the replacement contains children that have labels that conflicts</span>
<span class="c1"># with labels of the children of the original.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">_remove_labels_recursively</span><span class="p">(</span><span class="n">original_transform_node</span><span class="p">)</span>
<span class="n">new_output</span> <span class="o">=</span> <span class="n">replacement_transform</span><span class="o">.</span><span class="n">expand</span><span class="p">(</span><span class="n">input_node</span><span class="p">)</span>
<span class="n">new_output</span><span class="o">.</span><span class="n">element_type</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">_infer_result_type</span><span class="p">(</span><span class="n">replacement_transform</span><span class="p">,</span> <span class="n">inputs</span><span class="p">,</span>
<span class="n">new_output</span><span class="p">)</span>
<span class="n">replacement_transform_node</span><span class="o">.</span><span class="n">add_output</span><span class="p">(</span><span class="n">new_output</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">new_output</span><span class="o">.</span><span class="n">producer</span><span class="p">:</span>
<span class="n">new_output</span><span class="o">.</span><span class="n">producer</span> <span class="o">=</span> <span class="n">replacement_transform_node</span>
<span class="c1"># We only support replacing transforms with a single output with</span>
<span class="c1"># another transform that produces a single output.</span>
<span class="c1"># TODO: Support replacing PTransforms with multiple outputs.</span>
<span class="k">if</span> <span class="p">(</span><span class="nb">len</span><span class="p">(</span><span class="n">original_transform_node</span><span class="o">.</span><span class="n">outputs</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">1</span> <span class="ow">or</span>
<span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">original_transform_node</span><span class="o">.</span><span class="n">outputs</span><span class="p">[</span><span class="kc">None</span><span class="p">],</span>
<span class="p">(</span><span class="n">PCollection</span><span class="p">,</span> <span class="n">PDone</span><span class="p">))</span> <span class="ow">or</span>
<span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">new_output</span><span class="p">,</span> <span class="p">(</span><span class="n">PCollection</span><span class="p">,</span> <span class="n">PDone</span><span class="p">))):</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span>
<span class="s1">&#39;PTransform overriding is only supported for PTransforms that &#39;</span>
<span class="s1">&#39;have a single output. Tried to replace output of &#39;</span>
<span class="s1">&#39;AppliedPTransform </span><span class="si">%r</span><span class="s1"> with </span><span class="si">%r</span><span class="s1">.&#39;</span>
<span class="o">%</span> <span class="p">(</span><span class="n">original_transform_node</span><span class="p">,</span> <span class="n">new_output</span><span class="p">))</span>
<span class="c1"># Recording updated outputs. This cannot be done in the same visitor</span>
<span class="c1"># since if we dynamically update output type here, we&#39;ll run into</span>
<span class="c1"># errors when visiting child nodes.</span>
<span class="n">output_map</span><span class="p">[</span><span class="n">original_transform_node</span><span class="o">.</span><span class="n">outputs</span><span class="p">[</span><span class="kc">None</span><span class="p">]]</span> <span class="o">=</span> <span class="n">new_output</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">transforms_stack</span><span class="o">.</span><span class="n">pop</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">enter_composite_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_replace_if_needed</span><span class="p">(</span><span class="n">transform_node</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">visit_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_replace_if_needed</span><span class="p">(</span><span class="n">transform_node</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">TransformUpdater</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="c1"># Adjusting inputs and outputs</span>
<span class="k">class</span> <span class="nc">InputOutputUpdater</span><span class="p">(</span><span class="n">PipelineVisitor</span><span class="p">):</span> <span class="c1"># pylint: disable=used-before-assignment</span>
<span class="sd">&quot;&quot;&quot;&quot;A visitor that records input and output values to be replaced.</span>
<span class="sd"> Input and output values that should be updated are recorded in maps</span>
<span class="sd"> input_replacements and output_replacements respectively.</span>
<span class="sd"> We cannot update input and output values while visiting since that results</span>
<span class="sd"> in validation errors.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pipeline</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">=</span> <span class="n">pipeline</span>
<span class="k">def</span> <span class="nf">enter_composite_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">visit_transform</span><span class="p">(</span><span class="n">transform_node</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">visit_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="k">if</span> <span class="p">(</span><span class="kc">None</span> <span class="ow">in</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">outputs</span> <span class="ow">and</span>
<span class="n">transform_node</span><span class="o">.</span><span class="n">outputs</span><span class="p">[</span><span class="kc">None</span><span class="p">]</span> <span class="ow">in</span> <span class="n">output_map</span><span class="p">):</span>
<span class="n">output_replacements</span><span class="p">[</span><span class="n">transform_node</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">output_map</span><span class="p">[</span><span class="n">transform_node</span><span class="o">.</span><span class="n">outputs</span><span class="p">[</span><span class="kc">None</span><span class="p">]])</span>
<span class="n">replace_input</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">for</span> <span class="nb">input</span> <span class="ow">in</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">inputs</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">input</span> <span class="ow">in</span> <span class="n">output_map</span><span class="p">:</span>
<span class="n">replace_input</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">break</span>
<span class="n">replace_side_inputs</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">for</span> <span class="n">side_input</span> <span class="ow">in</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">:</span>
<span class="k">if</span> <span class="n">side_input</span><span class="o">.</span><span class="n">pvalue</span> <span class="ow">in</span> <span class="n">output_map</span><span class="p">:</span>
<span class="n">replace_side_inputs</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">break</span>
<span class="k">if</span> <span class="n">replace_input</span><span class="p">:</span>
<span class="n">new_input</span> <span class="o">=</span> <span class="p">[</span>
<span class="nb">input</span> <span class="k">if</span> <span class="ow">not</span> <span class="nb">input</span> <span class="ow">in</span> <span class="n">output_map</span> <span class="k">else</span> <span class="n">output_map</span><span class="p">[</span><span class="nb">input</span><span class="p">]</span>
<span class="k">for</span> <span class="nb">input</span> <span class="ow">in</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">inputs</span><span class="p">]</span>
<span class="n">input_replacements</span><span class="p">[</span><span class="n">transform_node</span><span class="p">]</span> <span class="o">=</span> <span class="n">new_input</span>
<span class="k">if</span> <span class="n">replace_side_inputs</span><span class="p">:</span>
<span class="n">new_side_inputs</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">side_input</span> <span class="ow">in</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">:</span>
<span class="k">if</span> <span class="n">side_input</span><span class="o">.</span><span class="n">pvalue</span> <span class="ow">in</span> <span class="n">output_map</span><span class="p">:</span>
<span class="n">side_input</span><span class="o">.</span><span class="n">pvalue</span> <span class="o">=</span> <span class="n">output_map</span><span class="p">[</span><span class="n">side_input</span><span class="o">.</span><span class="n">pvalue</span><span class="p">]</span>
<span class="n">new_side_inputs</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">side_input</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">new_side_inputs</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">side_input</span><span class="p">)</span>
<span class="n">side_input_replacements</span><span class="p">[</span><span class="n">transform_node</span><span class="p">]</span> <span class="o">=</span> <span class="n">new_side_inputs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">InputOutputUpdater</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="k">for</span> <span class="n">transform</span> <span class="ow">in</span> <span class="n">output_replacements</span><span class="p">:</span>
<span class="n">transform</span><span class="o">.</span><span class="n">replace_output</span><span class="p">(</span><span class="n">output_replacements</span><span class="p">[</span><span class="n">transform</span><span class="p">])</span>
<span class="k">for</span> <span class="n">transform</span> <span class="ow">in</span> <span class="n">input_replacements</span><span class="p">:</span>
<span class="n">transform</span><span class="o">.</span><span class="n">inputs</span> <span class="o">=</span> <span class="n">input_replacements</span><span class="p">[</span><span class="n">transform</span><span class="p">]</span>
<span class="k">for</span> <span class="n">transform</span> <span class="ow">in</span> <span class="n">side_input_replacements</span><span class="p">:</span>
<span class="n">transform</span><span class="o">.</span><span class="n">side_inputs</span> <span class="o">=</span> <span class="n">side_input_replacements</span><span class="p">[</span><span class="n">transform</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">_check_replacement</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">override</span><span class="p">):</span>
<span class="k">class</span> <span class="nc">ReplacementValidator</span><span class="p">(</span><span class="n">PipelineVisitor</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">visit_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="k">if</span> <span class="n">override</span><span class="o">.</span><span class="n">matches</span><span class="p">(</span><span class="n">transform_node</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span><span class="s1">&#39;Transform node </span><span class="si">%r</span><span class="s1"> was not replaced as expected.&#39;</span>
<span class="o">%</span> <span class="n">transform_node</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">ReplacementValidator</span><span class="p">())</span>
<div class="viewcode-block" id="Pipeline.replace_all"><a class="viewcode-back" href="../../apache_beam.pipeline.html#apache_beam.pipeline.Pipeline.replace_all">[docs]</a> <span class="k">def</span> <span class="nf">replace_all</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">replacements</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot; Dynamically replaces PTransforms in the currently populated hierarchy.</span>
<span class="sd"> Currently this only works for replacements where input and output types</span>
<span class="sd"> are exactly the same.</span>
<span class="sd"> TODO: Update this to also work for transform overrides where input and</span>
<span class="sd"> output types are different.</span>
<span class="sd"> Args:</span>
<span class="sd"> replacements (List[~apache_beam.pipeline.PTransformOverride]): a list of</span>
<span class="sd"> :class:`~apache_beam.pipeline.PTransformOverride` objects.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">for</span> <span class="n">override</span> <span class="ow">in</span> <span class="n">replacements</span><span class="p">:</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">override</span><span class="p">,</span> <span class="n">PTransformOverride</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_replace</span><span class="p">(</span><span class="n">override</span><span class="p">)</span>
<span class="c1"># Checking if the PTransforms have been successfully replaced. This will</span>
<span class="c1"># result in a failure if a PTransform that was replaced in a given override</span>
<span class="c1"># gets re-added in a subsequent override. This is not allowed and ordering</span>
<span class="c1"># of PTransformOverride objects in &#39;replacements&#39; is important.</span>
<span class="k">for</span> <span class="n">override</span> <span class="ow">in</span> <span class="n">replacements</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_check_replacement</span><span class="p">(</span><span class="n">override</span><span class="p">)</span></div>
<div class="viewcode-block" id="Pipeline.run"><a class="viewcode-back" href="../../apache_beam.pipeline.html#apache_beam.pipeline.Pipeline.run">[docs]</a> <span class="k">def</span> <span class="nf">run</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">test_runner_api</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Runs the pipeline. Returns whatever our runner returns after running.&quot;&quot;&quot;</span>
<span class="c1"># When possible, invoke a round trip through the runner API.</span>
<span class="k">if</span> <span class="n">test_runner_api</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">_verify_runner_api_compatible</span><span class="p">():</span>
<span class="k">return</span> <span class="n">Pipeline</span><span class="o">.</span><span class="n">from_runner_api</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">(</span><span class="n">use_fake_coders</span><span class="o">=</span><span class="kc">True</span><span class="p">),</span>
<span class="bp">self</span><span class="o">.</span><span class="n">runner</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="p">)</span><span class="o">.</span><span class="n">run</span><span class="p">(</span><span class="kc">False</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">TypeOptions</span><span class="p">)</span><span class="o">.</span><span class="n">runtime_type_check</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="k">import</span> <span class="n">typecheck</span>
<span class="bp">self</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">typecheck</span><span class="o">.</span><span class="n">TypeCheckVisitor</span><span class="p">())</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">SetupOptions</span><span class="p">)</span><span class="o">.</span><span class="n">save_main_session</span><span class="p">:</span>
<span class="c1"># If this option is chosen, verify we can pickle the main session early.</span>
<span class="n">tmpdir</span> <span class="o">=</span> <span class="n">tempfile</span><span class="o">.</span><span class="n">mkdtemp</span><span class="p">()</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">pickler</span><span class="o">.</span><span class="n">dump_session</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">tmpdir</span><span class="p">,</span> <span class="s1">&#39;main_session.pickle&#39;</span><span class="p">))</span>
<span class="k">finally</span><span class="p">:</span>
<span class="n">shutil</span><span class="o">.</span><span class="n">rmtree</span><span class="p">(</span><span class="n">tmpdir</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">run_pipeline</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span>
<span class="k">def</span> <span class="nf">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exc_type</span><span class="p">,</span> <span class="n">exc_val</span><span class="p">,</span> <span class="n">exc_tb</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">exc_type</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">run</span><span class="p">()</span><span class="o">.</span><span class="n">wait_until_finish</span><span class="p">()</span>
<div class="viewcode-block" id="Pipeline.visit"><a class="viewcode-back" href="../../apache_beam.pipeline.html#apache_beam.pipeline.Pipeline.visit">[docs]</a> <span class="k">def</span> <span class="nf">visit</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">visitor</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Visits depth-first every node of a pipeline&#39;s DAG.</span>
<span class="sd"> Runner-internal implementation detail; no backwards-compatibility guarantees</span>
<span class="sd"> Args:</span>
<span class="sd"> visitor (~apache_beam.pipeline.PipelineVisitor):</span>
<span class="sd"> :class:`~apache_beam.pipeline.PipelineVisitor` object whose callbacks</span>
<span class="sd"> will be called for each node visited. See</span>
<span class="sd"> :class:`~apache_beam.pipeline.PipelineVisitor` comments.</span>
<span class="sd"> Raises:</span>
<span class="sd"> ~exceptions.TypeError: if node is specified and is not a</span>
<span class="sd"> :class:`~apache_beam.pvalue.PValue`.</span>
<span class="sd"> ~apache_beam.error.PipelineError: if node is specified and does not</span>
<span class="sd"> belong to this pipeline instance.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">visited</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_root_transform</span><span class="p">()</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">visitor</span><span class="p">,</span> <span class="bp">self</span><span class="p">,</span> <span class="n">visited</span><span class="p">)</span></div>
<div class="viewcode-block" id="Pipeline.apply"><a class="viewcode-back" href="../../apache_beam.pipeline.html#apache_beam.pipeline.Pipeline.apply">[docs]</a> <span class="k">def</span> <span class="nf">apply</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform</span><span class="p">,</span> <span class="n">pvalueish</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Applies a custom transform using the pvalueish specified.</span>
<span class="sd"> Args:</span>
<span class="sd"> transform (~apache_beam.transforms.ptransform.PTransform): the</span>
<span class="sd"> :class:`~apache_beam.transforms.ptransform.PTransform` to apply.</span>
<span class="sd"> pvalueish (~apache_beam.pvalue.PCollection): the input for the</span>
<span class="sd"> :class:`~apache_beam.transforms.ptransform.PTransform` (typically a</span>
<span class="sd"> :class:`~apache_beam.pvalue.PCollection`).</span>
<span class="sd"> label (str): label of the</span>
<span class="sd"> :class:`~apache_beam.transforms.ptransform.PTransform`.</span>
<span class="sd"> Raises:</span>
<span class="sd"> ~exceptions.TypeError: if the transform object extracted from the</span>
<span class="sd"> argument list is not a</span>
<span class="sd"> :class:`~apache_beam.transforms.ptransform.PTransform`.</span>
<span class="sd"> ~exceptions.RuntimeError: if the transform object was already applied to</span>
<span class="sd"> this pipeline and needs to be cloned in order to apply again.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">_NamedPTransform</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">apply</span><span class="p">(</span><span class="n">transform</span><span class="o">.</span><span class="n">transform</span><span class="p">,</span> <span class="n">pvalueish</span><span class="p">,</span>
<span class="n">label</span> <span class="ow">or</span> <span class="n">transform</span><span class="o">.</span><span class="n">label</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;Expected a PTransform object, got </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">transform</span><span class="p">)</span>
<span class="k">if</span> <span class="n">label</span><span class="p">:</span>
<span class="c1"># Fix self.label as it is inspected by some PTransform operations</span>
<span class="c1"># (e.g. to produce error messages for type hint violations).</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">old_label</span><span class="p">,</span> <span class="n">transform</span><span class="o">.</span><span class="n">label</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">label</span><span class="p">,</span> <span class="n">label</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">apply</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="n">pvalueish</span><span class="p">)</span>
<span class="k">finally</span><span class="p">:</span>
<span class="n">transform</span><span class="o">.</span><span class="n">label</span> <span class="o">=</span> <span class="n">old_label</span>
<span class="n">full_label</span> <span class="o">=</span> <span class="s1">&#39;/&#39;</span><span class="o">.</span><span class="n">join</span><span class="p">([</span><span class="bp">self</span><span class="o">.</span><span class="n">_current_transform</span><span class="p">()</span><span class="o">.</span><span class="n">full_label</span><span class="p">,</span>
<span class="n">label</span> <span class="ow">or</span> <span class="n">transform</span><span class="o">.</span><span class="n">label</span><span class="p">])</span><span class="o">.</span><span class="n">lstrip</span><span class="p">(</span><span class="s1">&#39;/&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">full_label</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">applied_labels</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span>
<span class="s1">&#39;Transform &quot;</span><span class="si">%s</span><span class="s1">&quot; does not have a stable unique label. &#39;</span>
<span class="s1">&#39;This will prevent updating of pipelines. &#39;</span>
<span class="s1">&#39;To apply a transform with a specified label write &#39;</span>
<span class="s1">&#39;pvalue | &quot;label&quot; &gt;&gt; transform&#39;</span>
<span class="o">%</span> <span class="n">full_label</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">applied_labels</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">full_label</span><span class="p">)</span>
<span class="n">pvalueish</span><span class="p">,</span> <span class="n">inputs</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">_extract_input_pvalues</span><span class="p">(</span><span class="n">pvalueish</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">inputs</span> <span class="o">=</span> <span class="nb">tuple</span><span class="p">(</span><span class="n">inputs</span><span class="p">)</span>
<span class="k">for</span> <span class="n">leaf_input</span> <span class="ow">in</span> <span class="n">inputs</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">leaf_input</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PValue</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span>
<span class="s1">&#39;Unable to extract PValue inputs from </span><span class="si">%s</span><span class="s1">; either </span><span class="si">%s</span><span class="s1"> does not accept &#39;</span>
<span class="s1">&#39;inputs of this format, or it does not properly override &#39;</span>
<span class="s1">&#39;_extract_input_pvalues&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">pvalueish</span><span class="p">,</span> <span class="n">transform</span><span class="p">))</span>
<span class="n">current</span> <span class="o">=</span> <span class="n">AppliedPTransform</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_current_transform</span><span class="p">(),</span> <span class="n">transform</span><span class="p">,</span> <span class="n">full_label</span><span class="p">,</span> <span class="n">inputs</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_current_transform</span><span class="p">()</span><span class="o">.</span><span class="n">add_part</span><span class="p">(</span><span class="n">current</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">transforms_stack</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">current</span><span class="p">)</span>
<span class="n">type_options</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">TypeOptions</span><span class="p">)</span>
<span class="k">if</span> <span class="n">type_options</span><span class="o">.</span><span class="n">pipeline_type_check</span><span class="p">:</span>
<span class="n">transform</span><span class="o">.</span><span class="n">type_check_inputs</span><span class="p">(</span><span class="n">pvalueish</span><span class="p">)</span>
<span class="n">pvalueish_result</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">apply</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="n">pvalueish</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="p">)</span>
<span class="k">if</span> <span class="n">type_options</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">type_options</span><span class="o">.</span><span class="n">pipeline_type_check</span><span class="p">:</span>
<span class="n">transform</span><span class="o">.</span><span class="n">type_check_outputs</span><span class="p">(</span><span class="n">pvalueish_result</span><span class="p">)</span>
<span class="k">for</span> <span class="n">result</span> <span class="ow">in</span> <span class="n">ptransform</span><span class="o">.</span><span class="n">get_nested_pvalues</span><span class="p">(</span><span class="n">pvalueish_result</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="p">(</span><span class="n">pvalue</span><span class="o">.</span><span class="n">PValue</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">DoOutputsTuple</span><span class="p">))</span>
<span class="c1"># Make sure we set the producer only for a leaf node in the transform DAG.</span>
<span class="c1"># This way we preserve the last transform of a composite transform as</span>
<span class="c1"># being the real producer of the result.</span>
<span class="k">if</span> <span class="n">result</span><span class="o">.</span><span class="n">producer</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">result</span><span class="o">.</span><span class="n">producer</span> <span class="o">=</span> <span class="n">current</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_infer_result_type</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="n">inputs</span><span class="p">,</span> <span class="n">result</span><span class="p">)</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">result</span><span class="o">.</span><span class="n">producer</span><span class="o">.</span><span class="n">inputs</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">)</span>
<span class="n">current</span><span class="o">.</span><span class="n">add_output</span><span class="p">(</span><span class="n">result</span><span class="p">)</span>
<span class="k">if</span> <span class="p">(</span><span class="n">type_options</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span>
<span class="n">type_options</span><span class="o">.</span><span class="n">type_check_strictness</span> <span class="o">==</span> <span class="s1">&#39;ALL_REQUIRED&#39;</span> <span class="ow">and</span>
<span class="n">transform</span><span class="o">.</span><span class="n">get_type_hints</span><span class="p">()</span><span class="o">.</span><span class="n">output_types</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">):</span>
<span class="n">ptransform_name</span> <span class="o">=</span> <span class="s1">&#39;</span><span class="si">%s</span><span class="s1">(</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">transform</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">full_label</span><span class="p">)</span>
<span class="k">raise</span> <span class="n">TypeCheckError</span><span class="p">(</span><span class="s1">&#39;Pipeline type checking is enabled, however no &#39;</span>
<span class="s1">&#39;output type-hint was found for the &#39;</span>
<span class="s1">&#39;PTransform </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">ptransform_name</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">transforms_stack</span><span class="o">.</span><span class="n">pop</span><span class="p">()</span>
<span class="k">return</span> <span class="n">pvalueish_result</span></div>
<span class="k">def</span> <span class="nf">_infer_result_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform</span><span class="p">,</span> <span class="n">inputs</span><span class="p">,</span> <span class="n">result_pcollection</span><span class="p">):</span>
<span class="c1"># TODO(robertwb): Multi-input, multi-output inference.</span>
<span class="n">type_options</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">TypeOptions</span><span class="p">)</span>
<span class="k">if</span> <span class="p">(</span><span class="n">type_options</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">type_options</span><span class="o">.</span><span class="n">pipeline_type_check</span>
<span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">result_pcollection</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">)</span>
<span class="ow">and</span> <span class="p">(</span><span class="ow">not</span> <span class="n">result_pcollection</span><span class="o">.</span><span class="n">element_type</span>
<span class="c1"># TODO(robertwb): Ideally we&#39;d do intersection here.</span>
<span class="ow">or</span> <span class="n">result_pcollection</span><span class="o">.</span><span class="n">element_type</span> <span class="o">==</span> <span class="n">typehints</span><span class="o">.</span><span class="n">Any</span><span class="p">)):</span>
<span class="n">input_element_type</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">inputs</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">element_type</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">inputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="k">else</span> <span class="n">typehints</span><span class="o">.</span><span class="n">Any</span><span class="p">)</span>
<span class="n">type_hints</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">get_type_hints</span><span class="p">()</span>
<span class="n">declared_output_type</span> <span class="o">=</span> <span class="n">type_hints</span><span class="o">.</span><span class="n">simple_output_type</span><span class="p">(</span><span class="n">transform</span><span class="o">.</span><span class="n">label</span><span class="p">)</span>
<span class="k">if</span> <span class="n">declared_output_type</span><span class="p">:</span>
<span class="n">input_types</span> <span class="o">=</span> <span class="n">type_hints</span><span class="o">.</span><span class="n">input_types</span>
<span class="k">if</span> <span class="n">input_types</span> <span class="ow">and</span> <span class="n">input_types</span><span class="p">[</span><span class="mi">0</span><span class="p">]:</span>
<span class="n">declared_input_type</span> <span class="o">=</span> <span class="n">input_types</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span>
<span class="n">result_pcollection</span><span class="o">.</span><span class="n">element_type</span> <span class="o">=</span> <span class="n">typehints</span><span class="o">.</span><span class="n">bind_type_variables</span><span class="p">(</span>
<span class="n">declared_output_type</span><span class="p">,</span>
<span class="n">typehints</span><span class="o">.</span><span class="n">match_type_variables</span><span class="p">(</span><span class="n">declared_input_type</span><span class="p">,</span>
<span class="n">input_element_type</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">result_pcollection</span><span class="o">.</span><span class="n">element_type</span> <span class="o">=</span> <span class="n">declared_output_type</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">result_pcollection</span><span class="o">.</span><span class="n">element_type</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">infer_output_type</span><span class="p">(</span>
<span class="n">input_element_type</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__reduce__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># Some transforms contain a reference to their enclosing pipeline,</span>
<span class="c1"># which in turn reference all other transforms (resulting in quadratic</span>
<span class="c1"># time/space to pickle each transform individually). As we don&#39;t</span>
<span class="c1"># require pickled pipelines to be executable, break the chain here.</span>
<span class="k">return</span> <span class="nb">str</span><span class="p">,</span> <span class="p">(</span><span class="s1">&#39;Pickled pipeline stub.&#39;</span><span class="p">,)</span>
<span class="k">def</span> <span class="nf">_verify_runner_api_compatible</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">TypeOptions</span><span class="p">)</span><span class="o">.</span><span class="n">runtime_type_check</span><span class="p">:</span>
<span class="c1"># This option is incompatible with the runner API as it requires</span>
<span class="c1"># the runner to inspect non-serialized hints on the transform</span>
<span class="c1"># itself.</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">class</span> <span class="nc">Visitor</span><span class="p">(</span><span class="n">PipelineVisitor</span><span class="p">):</span> <span class="c1"># pylint: disable=used-before-assignment</span>
<span class="n">ok</span> <span class="o">=</span> <span class="kc">True</span> <span class="c1"># Really a nonlocal.</span>
<span class="k">def</span> <span class="nf">enter_composite_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">visit_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># Transforms must be picklable.</span>
<span class="n">pickler</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">pickler</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">transform_node</span><span class="o">.</span><span class="n">transform</span><span class="p">,</span>
<span class="n">enable_trace</span><span class="o">=</span><span class="kc">False</span><span class="p">),</span>
<span class="n">enable_trace</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span>
<span class="n">Visitor</span><span class="o">.</span><span class="n">ok</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">def</span> <span class="nf">visit_value</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">,</span> <span class="n">_</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PDone</span><span class="p">):</span>
<span class="n">Visitor</span><span class="o">.</span><span class="n">ok</span> <span class="o">=</span> <span class="kc">False</span>
<span class="bp">self</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">Visitor</span><span class="p">())</span>
<span class="k">return</span> <span class="n">Visitor</span><span class="o">.</span><span class="n">ok</span>
<div class="viewcode-block" id="Pipeline.to_runner_api"><a class="viewcode-back" href="../../apache_beam.pipeline.html#apache_beam.pipeline.Pipeline.to_runner_api">[docs]</a> <span class="k">def</span> <span class="nf">to_runner_api</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">return_context</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">context</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">use_fake_coders</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">default_environment</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;For internal use only; no backwards-compatibility guarantees.&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners</span> <span class="k">import</span> <span class="n">pipeline_context</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability.api</span> <span class="k">import</span> <span class="n">beam_runner_api_pb2</span>
<span class="k">if</span> <span class="n">context</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">context</span> <span class="o">=</span> <span class="n">pipeline_context</span><span class="o">.</span><span class="n">PipelineContext</span><span class="p">(</span>
<span class="n">use_fake_coders</span><span class="o">=</span><span class="n">use_fake_coders</span><span class="p">,</span>
<span class="n">default_environment</span><span class="o">=</span><span class="n">default_environment</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">default_environment</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;Only one of context or default_environment may be specified.&#39;</span><span class="p">)</span>
<span class="c1"># The RunnerAPI spec requires certain transforms and side-inputs to have KV</span>
<span class="c1"># inputs (and corresponding outputs).</span>
<span class="c1"># Currently we only upgrade to KV pairs. If there is a need for more</span>
<span class="c1"># general shapes, potential conflicts will have to be resolved.</span>
<span class="c1"># We also only handle single-input, and (for fixing the output) single</span>
<span class="c1"># output, which is sufficient.</span>
<span class="k">class</span> <span class="nc">ForceKvInputTypes</span><span class="p">(</span><span class="n">PipelineVisitor</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">enter_composite_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">visit_transform</span><span class="p">(</span><span class="n">transform_node</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">visit_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">transform</span><span class="p">:</span>
<span class="k">return</span>
<span class="k">if</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">runner_api_requires_keyed_input</span><span class="p">():</span>
<span class="n">pcoll</span> <span class="o">=</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">inputs</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span> <span class="o">=</span> <span class="n">typehints</span><span class="o">.</span><span class="n">coerce_to_kv_type</span><span class="p">(</span>
<span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">,</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">full_label</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">transform_node</span><span class="o">.</span><span class="n">outputs</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="c1"># The runner often has expectations about the output types as well.</span>
<span class="n">output</span><span class="p">,</span> <span class="o">=</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">outputs</span><span class="o">.</span><span class="n">values</span><span class="p">()</span>
<span class="n">output</span><span class="o">.</span><span class="n">element_type</span> <span class="o">=</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">infer_output_type</span><span class="p">(</span>
<span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">)</span>
<span class="k">for</span> <span class="n">side_input</span> <span class="ow">in</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">:</span>
<span class="k">if</span> <span class="n">side_input</span><span class="o">.</span><span class="n">requires_keyed_input</span><span class="p">():</span>
<span class="n">side_input</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">element_type</span> <span class="o">=</span> <span class="n">typehints</span><span class="o">.</span><span class="n">coerce_to_kv_type</span><span class="p">(</span>
<span class="n">side_input</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">element_type</span><span class="p">,</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">full_label</span><span class="p">,</span>
<span class="n">side_input_producer</span><span class="o">=</span><span class="n">side_input</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">producer</span><span class="o">.</span><span class="n">full_label</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">ForceKvInputTypes</span><span class="p">())</span>
<span class="c1"># Mutates context; placing inline would force dependence on</span>
<span class="c1"># argument evaluation order.</span>
<span class="n">root_transform_id</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="n">transforms</span><span class="o">.</span><span class="n">get_id</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_root_transform</span><span class="p">())</span>
<span class="n">proto</span> <span class="o">=</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">(</span>
<span class="n">root_transform_ids</span><span class="o">=</span><span class="p">[</span><span class="n">root_transform_id</span><span class="p">],</span>
<span class="n">components</span><span class="o">=</span><span class="n">context</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">())</span>
<span class="n">proto</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">root_transform_id</span><span class="p">]</span><span class="o">.</span><span class="n">unique_name</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">root_transform_id</span><span class="p">)</span>
<span class="k">if</span> <span class="n">return_context</span><span class="p">:</span>
<span class="k">return</span> <span class="n">proto</span><span class="p">,</span> <span class="n">context</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">proto</span></div>
<div class="viewcode-block" id="Pipeline.from_runner_api"><a class="viewcode-back" href="../../apache_beam.pipeline.html#apache_beam.pipeline.Pipeline.from_runner_api">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">from_runner_api</span><span class="p">(</span><span class="n">proto</span><span class="p">,</span> <span class="n">runner</span><span class="p">,</span> <span class="n">options</span><span class="p">,</span> <span class="n">return_context</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">allow_proto_holders</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;For internal use only; no backwards-compatibility guarantees.&quot;&quot;&quot;</span>
<span class="n">p</span> <span class="o">=</span> <span class="n">Pipeline</span><span class="p">(</span><span class="n">runner</span><span class="o">=</span><span class="n">runner</span><span class="p">,</span> <span class="n">options</span><span class="o">=</span><span class="n">options</span><span class="p">)</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners</span> <span class="k">import</span> <span class="n">pipeline_context</span>
<span class="n">context</span> <span class="o">=</span> <span class="n">pipeline_context</span><span class="o">.</span><span class="n">PipelineContext</span><span class="p">(</span>
<span class="n">proto</span><span class="o">.</span><span class="n">components</span><span class="p">,</span> <span class="n">allow_proto_holders</span><span class="o">=</span><span class="n">allow_proto_holders</span><span class="p">)</span>
<span class="n">root_transform_id</span><span class="p">,</span> <span class="o">=</span> <span class="n">proto</span><span class="o">.</span><span class="n">root_transform_ids</span>
<span class="n">p</span><span class="o">.</span><span class="n">transforms_stack</span> <span class="o">=</span> <span class="p">[</span>
<span class="n">context</span><span class="o">.</span><span class="n">transforms</span><span class="o">.</span><span class="n">get_by_id</span><span class="p">(</span><span class="n">root_transform_id</span><span class="p">)]</span>
<span class="c1"># TODO(robertwb): These are only needed to continue construction. Omit?</span>
<span class="n">p</span><span class="o">.</span><span class="n">applied_labels</span> <span class="o">=</span> <span class="nb">set</span><span class="p">([</span>
<span class="n">t</span><span class="o">.</span><span class="n">unique_name</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">proto</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="o">.</span><span class="n">values</span><span class="p">()])</span>
<span class="k">for</span> <span class="nb">id</span> <span class="ow">in</span> <span class="n">proto</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">pcollections</span><span class="p">:</span>
<span class="n">pcollection</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="n">pcollections</span><span class="o">.</span><span class="n">get_by_id</span><span class="p">(</span><span class="nb">id</span><span class="p">)</span>
<span class="n">pcollection</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">=</span> <span class="n">p</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">pcollection</span><span class="o">.</span><span class="n">producer</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;No producer for </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="nb">id</span><span class="p">)</span>
<span class="c1"># Inject PBegin input where necessary.</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.iobase</span> <span class="k">import</span> <span class="n">Read</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.core</span> <span class="k">import</span> <span class="n">Create</span>
<span class="n">has_pbegin</span> <span class="o">=</span> <span class="p">[</span><span class="n">Read</span><span class="p">,</span> <span class="n">Create</span><span class="p">]</span>
<span class="k">for</span> <span class="nb">id</span> <span class="ow">in</span> <span class="n">proto</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">:</span>
<span class="n">transform</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="n">transforms</span><span class="o">.</span><span class="n">get_by_id</span><span class="p">(</span><span class="nb">id</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">transform</span><span class="o">.</span><span class="n">inputs</span> <span class="ow">and</span> <span class="n">transform</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="vm">__class__</span> <span class="ow">in</span> <span class="n">has_pbegin</span><span class="p">:</span>
<span class="n">transform</span><span class="o">.</span><span class="n">inputs</span> <span class="o">=</span> <span class="p">(</span><span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">(</span><span class="n">p</span><span class="p">),)</span>
<span class="k">if</span> <span class="n">return_context</span><span class="p">:</span>
<span class="k">return</span> <span class="n">p</span><span class="p">,</span> <span class="n">context</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">p</span></div></div>
<span class="k">class</span> <span class="nc">PipelineVisitor</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;For internal use only; no backwards-compatibility guarantees.</span>
<span class="sd"> Visitor pattern class used to traverse a DAG of transforms</span>
<span class="sd"> (used internally by Pipeline for bookeeping purposes).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">visit_value</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">,</span> <span class="n">producer_node</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Callback for visiting a PValue in the pipeline DAG.</span>
<span class="sd"> Args:</span>
<span class="sd"> value: PValue visited (typically a PCollection instance).</span>
<span class="sd"> producer_node: AppliedPTransform object whose transform produced the</span>
<span class="sd"> pvalue.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">visit_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Callback for visiting a transform leaf node in the pipeline DAG.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">enter_composite_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Callback for entering traversal of a composite transform node.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">leave_composite_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Callback for leaving traversal of a composite transform node.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<span class="k">class</span> <span class="nc">AppliedPTransform</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;For internal use only; no backwards-compatibility guarantees.</span>
<span class="sd"> A transform node representing an instance of applying a PTransform</span>
<span class="sd"> (used internally by Pipeline for bookeeping purposes).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">parent</span><span class="p">,</span> <span class="n">transform</span><span class="p">,</span> <span class="n">full_label</span><span class="p">,</span> <span class="n">inputs</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">parent</span> <span class="o">=</span> <span class="n">parent</span>
<span class="bp">self</span><span class="o">.</span><span class="n">transform</span> <span class="o">=</span> <span class="n">transform</span>
<span class="c1"># Note that we want the PipelineVisitor classes to use the full_label,</span>
<span class="c1"># inputs, side_inputs, and outputs fields from this instance instead of the</span>
<span class="c1"># ones of the PTransform instance associated with it. Doing this permits</span>
<span class="c1"># reusing PTransform instances in different contexts (apply() calls) without</span>
<span class="c1"># any interference. This is particularly useful for composite transforms.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">full_label</span> <span class="o">=</span> <span class="n">full_label</span>
<span class="bp">self</span><span class="o">.</span><span class="n">inputs</span> <span class="o">=</span> <span class="n">inputs</span> <span class="ow">or</span> <span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">side_inputs</span> <span class="o">=</span> <span class="p">()</span> <span class="k">if</span> <span class="n">transform</span> <span class="ow">is</span> <span class="kc">None</span> <span class="k">else</span> <span class="nb">tuple</span><span class="p">(</span><span class="n">transform</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">outputs</span> <span class="o">=</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">parts</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s2">&quot;</span><span class="si">%s</span><span class="s2">(</span><span class="si">%s</span><span class="s2">, </span><span class="si">%s</span><span class="s2">)&quot;</span> <span class="o">%</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">full_label</span><span class="p">,</span>
<span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">transform</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">replace_output</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">output</span><span class="p">,</span> <span class="n">tag</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Replaces the output defined by the given tag with the given output.</span>
<span class="sd"> Args:</span>
<span class="sd"> output: replacement output</span>
<span class="sd"> tag: tag of the output to be replaced.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">output</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">DoOutputsTuple</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">replace_output</span><span class="p">(</span><span class="n">output</span><span class="p">[</span><span class="n">output</span><span class="o">.</span><span class="n">_main_tag</span><span class="p">])</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">output</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PValue</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">outputs</span><span class="p">[</span><span class="n">tag</span><span class="p">]</span> <span class="o">=</span> <span class="n">output</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;Unexpected output type: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">output</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">add_output</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">output</span><span class="p">,</span> <span class="n">tag</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">output</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">DoOutputsTuple</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">add_output</span><span class="p">(</span><span class="n">output</span><span class="p">[</span><span class="n">output</span><span class="o">.</span><span class="n">_main_tag</span><span class="p">])</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">output</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PValue</span><span class="p">):</span>
<span class="c1"># TODO(BEAM-1833): Require tags when calling this method.</span>
<span class="k">if</span> <span class="n">tag</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">and</span> <span class="kc">None</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">outputs</span><span class="p">:</span>
<span class="n">tag</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">outputs</span><span class="p">)</span>
<span class="k">assert</span> <span class="n">tag</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">outputs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">outputs</span><span class="p">[</span><span class="n">tag</span><span class="p">]</span> <span class="o">=</span> <span class="n">output</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;Unexpected output type: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">output</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">add_part</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">part</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">part</span><span class="p">,</span> <span class="n">AppliedPTransform</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">parts</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">part</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">is_composite</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns whether this is a composite transform.</span>
<span class="sd"> A composite transform has parts (inner transforms) or isn&#39;t the</span>
<span class="sd"> producer for any of its outputs. (An example of a transform that</span>
<span class="sd"> is not a producer is one that returns its inputs instead.)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="nb">bool</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">parts</span><span class="p">)</span> <span class="ow">or</span> <span class="nb">all</span><span class="p">(</span>
<span class="n">pval</span><span class="o">.</span><span class="n">producer</span> <span class="ow">is</span> <span class="ow">not</span> <span class="bp">self</span> <span class="k">for</span> <span class="n">pval</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">outputs</span><span class="o">.</span><span class="n">values</span><span class="p">())</span>
<span class="k">def</span> <span class="nf">visit</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">visitor</span><span class="p">,</span> <span class="n">pipeline</span><span class="p">,</span> <span class="n">visited</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Visits all nodes reachable from the current node.&quot;&quot;&quot;</span>
<span class="k">for</span> <span class="n">pval</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">inputs</span><span class="p">:</span>
<span class="k">if</span> <span class="n">pval</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">visited</span> <span class="ow">and</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pval</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PBegin</span><span class="p">):</span>
<span class="k">if</span> <span class="n">pval</span><span class="o">.</span><span class="n">producer</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">pval</span><span class="o">.</span><span class="n">producer</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">visitor</span><span class="p">,</span> <span class="n">pipeline</span><span class="p">,</span> <span class="n">visited</span><span class="p">)</span>
<span class="c1"># The value should be visited now since we visit outputs too.</span>
<span class="k">assert</span> <span class="n">pval</span> <span class="ow">in</span> <span class="n">visited</span><span class="p">,</span> <span class="n">pval</span>
<span class="c1"># Visit side inputs.</span>
<span class="k">for</span> <span class="n">pval</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pval</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">AsSideInput</span><span class="p">)</span> <span class="ow">and</span> <span class="n">pval</span><span class="o">.</span><span class="n">pvalue</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">visited</span><span class="p">:</span>
<span class="n">pval</span> <span class="o">=</span> <span class="n">pval</span><span class="o">.</span><span class="n">pvalue</span> <span class="c1"># Unpack marker-object-wrapped pvalue.</span>
<span class="k">if</span> <span class="n">pval</span><span class="o">.</span><span class="n">producer</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">pval</span><span class="o">.</span><span class="n">producer</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">visitor</span><span class="p">,</span> <span class="n">pipeline</span><span class="p">,</span> <span class="n">visited</span><span class="p">)</span>
<span class="c1"># The value should be visited now since we visit outputs too.</span>
<span class="k">assert</span> <span class="n">pval</span> <span class="ow">in</span> <span class="n">visited</span>
<span class="c1"># TODO(silviuc): Is there a way to signal that we are visiting a side</span>
<span class="c1"># value? The issue is that the same PValue can be reachable through</span>
<span class="c1"># multiple paths and therefore it is not guaranteed that the value</span>
<span class="c1"># will be visited as a side value.</span>
<span class="c1"># Visit a composite or primitive transform.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_composite</span><span class="p">():</span>
<span class="n">visitor</span><span class="o">.</span><span class="n">enter_composite_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="k">for</span> <span class="n">part</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">parts</span><span class="p">:</span>
<span class="n">part</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">visitor</span><span class="p">,</span> <span class="n">pipeline</span><span class="p">,</span> <span class="n">visited</span><span class="p">)</span>
<span class="n">visitor</span><span class="o">.</span><span class="n">leave_composite_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">visitor</span><span class="o">.</span><span class="n">visit_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="c1"># Visit the outputs (one or more). It is essential to mark as visited the</span>
<span class="c1"># tagged PCollections of the DoOutputsTuple object. A tagged PCollection is</span>
<span class="c1"># connected directly with its producer (a multi-output ParDo), but the</span>
<span class="c1"># output of such a transform is the containing DoOutputsTuple, not the</span>
<span class="c1"># PCollection inside it. Without the code below a tagged PCollection will</span>
<span class="c1"># not be marked as visited while visiting its producer.</span>
<span class="k">for</span> <span class="n">pval</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">outputs</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pval</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">DoOutputsTuple</span><span class="p">):</span>
<span class="n">pvals</span> <span class="o">=</span> <span class="p">(</span><span class="n">v</span> <span class="k">for</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">pval</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">pvals</span> <span class="o">=</span> <span class="p">(</span><span class="n">pval</span><span class="p">,)</span>
<span class="k">for</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">pvals</span><span class="p">:</span>
<span class="k">if</span> <span class="n">v</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">visited</span><span class="p">:</span>
<span class="n">visited</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">v</span><span class="p">)</span>
<span class="n">visitor</span><span class="o">.</span><span class="n">visit_value</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">named_inputs</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># TODO(BEAM-1833): Push names up into the sdk construction.</span>
<span class="n">main_inputs</span> <span class="o">=</span> <span class="p">{</span><span class="nb">str</span><span class="p">(</span><span class="n">ix</span><span class="p">):</span> <span class="nb">input</span>
<span class="k">for</span> <span class="n">ix</span><span class="p">,</span> <span class="nb">input</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">inputs</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="nb">input</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">)}</span>
<span class="n">side_inputs</span> <span class="o">=</span> <span class="p">{</span><span class="s1">&#39;side</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">ix</span><span class="p">:</span> <span class="n">si</span><span class="o">.</span><span class="n">pvalue</span>
<span class="k">for</span> <span class="n">ix</span><span class="p">,</span> <span class="n">si</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">)}</span>
<span class="k">return</span> <span class="nb">dict</span><span class="p">(</span><span class="n">main_inputs</span><span class="p">,</span> <span class="o">**</span><span class="n">side_inputs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">named_outputs</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="nb">str</span><span class="p">(</span><span class="n">tag</span><span class="p">):</span> <span class="n">output</span> <span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">output</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">outputs</span><span class="o">.</span><span class="n">items</span><span class="p">()</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">output</span><span class="p">,</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">)}</span>
<span class="k">def</span> <span class="nf">to_runner_api</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="c1"># External tranforms require more splicing than just setting the spec.</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="k">import</span> <span class="n">external</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">transform</span><span class="p">,</span> <span class="n">external</span><span class="o">.</span><span class="n">ExternalTransform</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">to_runner_api_transform</span><span class="p">(</span><span class="n">context</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">full_label</span><span class="p">)</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability.api</span> <span class="k">import</span> <span class="n">beam_runner_api_pb2</span>
<span class="k">def</span> <span class="nf">transform_to_runner_api</span><span class="p">(</span><span class="n">transform</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">if</span> <span class="n">transform</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">None</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">transform</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">(</span><span class="n">context</span><span class="p">,</span> <span class="n">has_parts</span><span class="o">=</span><span class="nb">bool</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">parts</span><span class="p">))</span>
<span class="c1"># Iterate over inputs and outputs by sorted key order, so that ids are</span>
<span class="c1"># consistently generated for multiple runs of the same pipeline.</span>
<span class="k">return</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">PTransform</span><span class="p">(</span>
<span class="n">unique_name</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">full_label</span><span class="p">,</span>
<span class="n">spec</span><span class="o">=</span><span class="n">transform_to_runner_api</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">transform</span><span class="p">,</span> <span class="n">context</span><span class="p">),</span>
<span class="n">subtransforms</span><span class="o">=</span><span class="p">[</span><span class="n">context</span><span class="o">.</span><span class="n">transforms</span><span class="o">.</span><span class="n">get_id</span><span class="p">(</span><span class="n">part</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="n">part</span><span class="o">.</span><span class="n">full_label</span><span class="p">)</span>
<span class="k">for</span> <span class="n">part</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">parts</span><span class="p">],</span>
<span class="n">inputs</span><span class="o">=</span><span class="p">{</span><span class="n">tag</span><span class="p">:</span> <span class="n">context</span><span class="o">.</span><span class="n">pcollections</span><span class="o">.</span><span class="n">get_id</span><span class="p">(</span><span class="n">pc</span><span class="p">)</span>
<span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">pc</span> <span class="ow">in</span> <span class="nb">sorted</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">named_inputs</span><span class="p">()</span><span class="o">.</span><span class="n">items</span><span class="p">())},</span>
<span class="n">outputs</span><span class="o">=</span><span class="p">{</span><span class="nb">str</span><span class="p">(</span><span class="n">tag</span><span class="p">):</span> <span class="n">context</span><span class="o">.</span><span class="n">pcollections</span><span class="o">.</span><span class="n">get_id</span><span class="p">(</span><span class="n">out</span><span class="p">)</span>
<span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">out</span> <span class="ow">in</span> <span class="nb">sorted</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">named_outputs</span><span class="p">()</span><span class="o">.</span><span class="n">items</span><span class="p">())},</span>
<span class="c1"># TODO(BEAM-115): display_data</span>
<span class="n">display_data</span><span class="o">=</span><span class="kc">None</span><span class="p">)</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">from_runner_api</span><span class="p">(</span><span class="n">proto</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">is_side_input</span><span class="p">(</span><span class="n">tag</span><span class="p">):</span>
<span class="c1"># As per named_inputs() above.</span>
<span class="k">return</span> <span class="n">tag</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;side&#39;</span><span class="p">)</span>
<span class="n">main_inputs</span> <span class="o">=</span> <span class="p">[</span><span class="n">context</span><span class="o">.</span><span class="n">pcollections</span><span class="o">.</span><span class="n">get_by_id</span><span class="p">(</span><span class="nb">id</span><span class="p">)</span>
<span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="nb">id</span> <span class="ow">in</span> <span class="n">proto</span><span class="o">.</span><span class="n">inputs</span><span class="o">.</span><span class="n">items</span><span class="p">()</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">is_side_input</span><span class="p">(</span><span class="n">tag</span><span class="p">)]</span>
<span class="c1"># Ordering is important here.</span>
<span class="n">indexed_side_inputs</span> <span class="o">=</span> <span class="p">[(</span><span class="nb">int</span><span class="p">(</span><span class="n">re</span><span class="o">.</span><span class="n">match</span><span class="p">(</span><span class="s1">&#39;side([0-9]+)(-.*)?$&#39;</span><span class="p">,</span> <span class="n">tag</span><span class="p">)</span><span class="o">.</span><span class="n">group</span><span class="p">(</span><span class="mi">1</span><span class="p">)),</span>
<span class="n">context</span><span class="o">.</span><span class="n">pcollections</span><span class="o">.</span><span class="n">get_by_id</span><span class="p">(</span><span class="nb">id</span><span class="p">))</span>
<span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="nb">id</span> <span class="ow">in</span> <span class="n">proto</span><span class="o">.</span><span class="n">inputs</span><span class="o">.</span><span class="n">items</span><span class="p">()</span>
<span class="k">if</span> <span class="n">is_side_input</span><span class="p">(</span><span class="n">tag</span><span class="p">)]</span>
<span class="n">side_inputs</span> <span class="o">=</span> <span class="p">[</span><span class="n">si</span> <span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">si</span> <span class="ow">in</span> <span class="nb">sorted</span><span class="p">(</span><span class="n">indexed_side_inputs</span><span class="p">)]</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">AppliedPTransform</span><span class="p">(</span>
<span class="n">parent</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">transform</span><span class="o">=</span><span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="o">.</span><span class="n">from_runner_api</span><span class="p">(</span><span class="n">proto</span><span class="o">.</span><span class="n">spec</span><span class="p">,</span> <span class="n">context</span><span class="p">),</span>
<span class="n">full_label</span><span class="o">=</span><span class="n">proto</span><span class="o">.</span><span class="n">unique_name</span><span class="p">,</span>
<span class="n">inputs</span><span class="o">=</span><span class="n">main_inputs</span><span class="p">)</span>
<span class="k">if</span> <span class="n">result</span><span class="o">.</span><span class="n">transform</span> <span class="ow">and</span> <span class="n">result</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">:</span>
<span class="k">for</span> <span class="n">si</span><span class="p">,</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="nb">zip</span><span class="p">(</span><span class="n">result</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">,</span> <span class="n">side_inputs</span><span class="p">):</span>
<span class="n">si</span><span class="o">.</span><span class="n">pvalue</span> <span class="o">=</span> <span class="n">pcoll</span>
<span class="n">result</span><span class="o">.</span><span class="n">side_inputs</span> <span class="o">=</span> <span class="nb">tuple</span><span class="p">(</span><span class="n">result</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">)</span>
<span class="n">result</span><span class="o">.</span><span class="n">parts</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="n">proto</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">:</span>
<span class="n">part</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="n">transforms</span><span class="o">.</span><span class="n">get_by_id</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span>
<span class="n">part</span><span class="o">.</span><span class="n">parent</span> <span class="o">=</span> <span class="n">result</span>
<span class="n">result</span><span class="o">.</span><span class="n">parts</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">part</span><span class="p">)</span>
<span class="n">result</span><span class="o">.</span><span class="n">outputs</span> <span class="o">=</span> <span class="p">{</span>
<span class="kc">None</span> <span class="k">if</span> <span class="n">tag</span> <span class="o">==</span> <span class="s1">&#39;None&#39;</span> <span class="k">else</span> <span class="n">tag</span><span class="p">:</span> <span class="n">context</span><span class="o">.</span><span class="n">pcollections</span><span class="o">.</span><span class="n">get_by_id</span><span class="p">(</span><span class="nb">id</span><span class="p">)</span>
<span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="nb">id</span> <span class="ow">in</span> <span class="n">proto</span><span class="o">.</span><span class="n">outputs</span><span class="o">.</span><span class="n">items</span><span class="p">()}</span>
<span class="c1"># This annotation is expected by some runners.</span>
<span class="k">if</span> <span class="n">proto</span><span class="o">.</span><span class="n">spec</span><span class="o">.</span><span class="n">urn</span> <span class="o">==</span> <span class="n">common_urns</span><span class="o">.</span><span class="n">primitives</span><span class="o">.</span><span class="n">PAR_DO</span><span class="o">.</span><span class="n">urn</span><span class="p">:</span>
<span class="n">result</span><span class="o">.</span><span class="n">transform</span><span class="o">.</span><span class="n">output_tags</span> <span class="o">=</span> <span class="nb">set</span><span class="p">(</span><span class="n">proto</span><span class="o">.</span><span class="n">outputs</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span><span class="o">.</span><span class="n">difference</span><span class="p">(</span>
<span class="p">{</span><span class="s1">&#39;None&#39;</span><span class="p">})</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">result</span><span class="o">.</span><span class="n">parts</span><span class="p">:</span>
<span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">pcoll_id</span> <span class="ow">in</span> <span class="n">proto</span><span class="o">.</span><span class="n">outputs</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="k">if</span> <span class="n">pcoll_id</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">proto</span><span class="o">.</span><span class="n">inputs</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="n">pc</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="n">pcollections</span><span class="o">.</span><span class="n">get_by_id</span><span class="p">(</span><span class="n">pcoll_id</span><span class="p">)</span>
<span class="n">pc</span><span class="o">.</span><span class="n">producer</span> <span class="o">=</span> <span class="n">result</span>
<span class="n">pc</span><span class="o">.</span><span class="n">tag</span> <span class="o">=</span> <span class="kc">None</span> <span class="k">if</span> <span class="n">tag</span> <span class="o">==</span> <span class="s1">&#39;None&#39;</span> <span class="k">else</span> <span class="n">tag</span>
<span class="k">return</span> <span class="n">result</span>
<div class="viewcode-block" id="PTransformOverride"><a class="viewcode-back" href="../../apache_beam.pipeline.html#apache_beam.pipeline.PTransformOverride">[docs]</a><span class="k">class</span> <span class="nc">PTransformOverride</span><span class="p">(</span><span class="n">with_metaclass</span><span class="p">(</span><span class="n">abc</span><span class="o">.</span><span class="n">ABCMeta</span><span class="p">,</span> <span class="nb">object</span><span class="p">)):</span>
<span class="sd">&quot;&quot;&quot;For internal use only; no backwards-compatibility guarantees.</span>
<span class="sd"> Gives a matcher and replacements for matching PTransforms.</span>
<span class="sd"> TODO: Update this to support cases where input and/our output types are</span>
<span class="sd"> different.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="PTransformOverride.matches"><a class="viewcode-back" href="../../apache_beam.pipeline.html#apache_beam.pipeline.PTransformOverride.matches">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span>
<span class="k">def</span> <span class="nf">matches</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">applied_ptransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Determines whether the given AppliedPTransform matches.</span>
<span class="sd"> Note that the matching will happen *after* Runner API proto translation.</span>
<span class="sd"> If matching is done via type checks, to/from_runner_api[_parameter] methods</span>
<span class="sd"> must be implemented to preserve the type (and other data) through proto</span>
<span class="sd"> serialization.</span>
<span class="sd"> Consider URN-based translation instead.</span>
<span class="sd"> Args:</span>
<span class="sd"> applied_ptransform: AppliedPTransform to be matched.</span>
<span class="sd"> Returns:</span>
<span class="sd"> a bool indicating whether the given AppliedPTransform is a match.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="PTransformOverride.get_replacement_transform"><a class="viewcode-back" href="../../apache_beam.pipeline.html#apache_beam.pipeline.PTransformOverride.get_replacement_transform">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span>
<span class="k">def</span> <span class="nf">get_replacement_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ptransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Provides a runner specific override for a given PTransform.</span>
<span class="sd"> Args:</span>
<span class="sd"> ptransform: PTransform to be replaced.</span>
<span class="sd"> Returns:</span>
<span class="sd"> A PTransform that will be the replacement for the PTransform given as an</span>
<span class="sd"> argument.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Returns a PTransformReplacement</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div>
</pre></div>
</div>
<div class="articleComments">
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright .
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
var DOCUMENTATION_OPTIONS = {
URL_ROOT:'../../',
VERSION:'',
COLLAPSE_INDEX:false,
FILE_SUFFIX:'.html',
HAS_SOURCE: true,
SOURCELINK_SUFFIX: '.txt'
};
</script>
<script type="text/javascript" src="../../_static/jquery.js"></script>
<script type="text/javascript" src="../../_static/underscore.js"></script>
<script type="text/javascript" src="../../_static/doctools.js"></script>
<script type="text/javascript" src="../../_static/js/theme.js"></script>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.StickyNav.enable();
});
</script>
</body>
</html>