blob: daf43440a0809715a9562a579ef9c0e9b9c177f3 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.runners.render &mdash; Apache Beam 2.47.0 documentation</title>
<script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../genindex.html" />
<link rel="search" title="Search" href="../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.47.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.runners.render</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.runners.render</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;A portable &quot;runner&quot; that renders a beam graph.</span>
<span class="sd">This runner can either render the graph to a (set of) output path(s), as</span>
<span class="sd">designated by (possibly repeated) --render_output, or serve the pipeline as</span>
<span class="sd">an interactive graph, if --render_port is set.</span>
<span class="sd">In Python, this runner can be passed directly at pipeline construction, e.g.::</span>
<span class="sd"> with beam.Pipeline(runner=beam.runners.render.RenderRunner(), options=...)</span>
<span class="sd">For other languages, start this service a by running::</span>
<span class="sd"> python -m apache_beam.runners.render --job_port=PORT ...</span>
<span class="sd">and then run your pipline with the PortableRunner setting the job endpoint</span>
<span class="sd">to `localhost:PORT`.</span>
<span class="sd">If any `--render_output=path.ext` flags are passed, each submitted job will</span>
<span class="sd">get written to the given output (overwriting any previously existing file).</span>
<span class="sd">If `--render_port` is set to a non-negative value, a local http server will</span>
<span class="sd">be started which allows for interactive exploration of the pipeline graph.</span>
<span class="sd">As an alternative to starting a job server, a single pipeline can be rendered</span>
<span class="sd">by passing a pipeline proto file to `--pipeline_proto`. For example</span>
<span class="sd"> python -m apache_beam.runners.render \\</span>
<span class="sd"> --pipeline_proto gs://&lt;staging_location&gt;/pipeline.pb \\</span>
<span class="sd"> --render_output=/tmp/pipeline.svg</span>
<span class="sd">Requires the graphviz dot executable to be available in the path.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="kn">import</span> <span class="nn">argparse</span>
<span class="kn">import</span> <span class="nn">base64</span>
<span class="kn">import</span> <span class="nn">collections</span>
<span class="kn">import</span> <span class="nn">http.server</span>
<span class="kn">import</span> <span class="nn">json</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">os</span>
<span class="kn">import</span> <span class="nn">re</span>
<span class="kn">import</span> <span class="nn">subprocess</span>
<span class="kn">import</span> <span class="nn">sys</span>
<span class="kn">import</span> <span class="nn">tempfile</span>
<span class="kn">import</span> <span class="nn">threading</span>
<span class="kn">import</span> <span class="nn">time</span>
<span class="kn">import</span> <span class="nn">urllib.parse</span>
<span class="kn">from</span> <span class="nn">google.protobuf</span> <span class="kn">import</span> <span class="n">json_format</span>
<span class="kn">from</span> <span class="nn">google.protobuf</span> <span class="kn">import</span> <span class="n">text_format</span> <span class="c1"># type: ignore</span>
<span class="kn">from</span> <span class="nn">apache_beam.options</span> <span class="kn">import</span> <span class="n">pipeline_options</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability.api</span> <span class="kn">import</span> <span class="n">beam_runner_api_pb2</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners</span> <span class="kn">import</span> <span class="n">runner</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.portability</span> <span class="kn">import</span> <span class="n">local_job_service</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.portability</span> <span class="kn">import</span> <span class="n">local_job_service_main</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.portability.fn_api_runner</span> <span class="kn">import</span> <span class="n">translations</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp</span> <span class="kn">import</span> <span class="n">gcsio</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="n">gcsio</span> <span class="o">=</span> <span class="kc">None</span> <span class="c1"># type: ignore</span>
<span class="c1"># From the Beam site, circa November 2022.</span>
<span class="n">DEFAULT_EDGE_STYLE</span> <span class="o">=</span> <span class="s1">&#39;color=&quot;#ff570b&quot;&#39;</span>
<span class="n">DEFAULT_TRANSFORM_STYLE</span> <span class="o">=</span> <span class="p">(</span>
<span class="s1">&#39;shape=rect style=&quot;rounded, filled&quot; color=&quot;#ff570b&quot; fillcolor=&quot;#fff6dd&quot;&#39;</span><span class="p">)</span>
<span class="n">DEFAULT_HIGHLIGHT_STYLE</span> <span class="o">=</span> <span class="p">(</span>
<span class="s1">&#39;shape=rect style=&quot;rounded, filled&quot; color=&quot;#ff570b&quot; fillcolor=&quot;#ffdb97&quot;&#39;</span><span class="p">)</span>
<div class="viewcode-block" id="RenderOptions"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.RenderOptions">[docs]</a><span class="k">class</span> <span class="nc">RenderOptions</span><span class="p">(</span><span class="n">pipeline_options</span><span class="o">.</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Rendering options.&quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--render_port&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=-</span><span class="mi">1</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The port at which to serve the graph. &#39;</span>
<span class="s1">&#39;If 0, an unused port will be chosen. &#39;</span>
<span class="s1">&#39;If -1, the server will not be started.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--render_output&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;A path or paths to which to write rendered output. &#39;</span>
<span class="s1">&#39;The output type will be deduced from the file extension.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--render_leaf_composite_nodes&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;A set of regular expressions for transform names that should &#39;</span>
<span class="s1">&#39;not be expanded. For example, one could pass &quot;</span><span class="se">\b</span><span class="s1">Read.*&quot; to indicate &#39;</span>
<span class="s1">&#39;the inner structure of read nodes should not be expanded. &#39;</span>
<span class="s1">&#39;If not given, defaults to the top-level nodes if interactively &#39;</span>
<span class="s1">&#39;serving the graph and expanding all nodes otherwise.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--render_edge_attributes&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Graphviz attributes to add to all edges.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--render_node_attributes&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Graphviz attributes to add to all nodes.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--render_highlight_attributes&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Graphviz attributes to add to all highlighted nodes.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--log_proto&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Set to also log input pipeline proto to stdout.&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="n">parser</span></div>
<div class="viewcode-block" id="PipelineRenderer"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer">[docs]</a><span class="k">class</span> <span class="nc">PipelineRenderer</span><span class="p">:</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pipeline</span><span class="p">,</span> <span class="n">options</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">=</span> <span class="n">pipeline</span>
<span class="bp">self</span><span class="o">.</span><span class="n">options</span> <span class="o">=</span> <span class="n">options</span>
<span class="c1"># Drill down into any uninteresting, top-level transforms that contain</span>
<span class="c1"># the whole pipeline (often added by the SDK).</span>
<span class="n">roots</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">root_transform_ids</span>
<span class="k">while</span> <span class="nb">len</span><span class="p">(</span><span class="n">roots</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="n">root</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">roots</span><span class="p">[</span><span class="mi">0</span><span class="p">]]</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">root</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">:</span>
<span class="k">break</span>
<span class="n">roots</span> <span class="o">=</span> <span class="n">root</span><span class="o">.</span><span class="n">subtransforms</span>
<span class="bp">self</span><span class="o">.</span><span class="n">roots</span> <span class="o">=</span> <span class="n">roots</span>
<span class="c1"># Figure out at what point to stop rendering composite internals.</span>
<span class="k">if</span> <span class="n">options</span><span class="o">.</span><span class="n">render_leaf_composite_nodes</span><span class="p">:</span>
<span class="n">is_leaf</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">name</span><span class="p">:</span> <span class="nb">any</span><span class="p">(</span>
<span class="n">re</span><span class="o">.</span><span class="n">match</span><span class="p">(</span><span class="n">pattern</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span>
<span class="k">for</span> <span class="n">patterns</span> <span class="ow">in</span> <span class="n">options</span><span class="o">.</span><span class="n">render_leaf_composite_nodes</span>
<span class="k">for</span> <span class="n">pattern</span> <span class="ow">in</span> <span class="n">patterns</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;,&#39;</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">mark_leaves</span><span class="p">(</span><span class="n">transform_ids</span><span class="p">):</span>
<span class="k">for</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="n">transform_ids</span><span class="p">:</span>
<span class="k">if</span> <span class="n">is_leaf</span><span class="p">(</span><span class="n">transform_id</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">mark_leaves</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">options</span><span class="o">.</span><span class="n">render_port</span> <span class="o">&gt;=</span> <span class="mi">0</span><span class="p">:</span>
<span class="c1"># Start interactive with no unfolding.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span> <span class="o">=</span> <span class="nb">set</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">roots</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># For non-interactive, expand fully.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="c1"># Useful for attempting graph layout consistency.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span> <span class="o">=</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">highlighted</span> <span class="o">=</span> <span class="p">[]</span>
<div class="viewcode-block" id="PipelineRenderer.update"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.update">[docs]</a> <span class="k">def</span> <span class="nf">update</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">toggle</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="n">toggle</span><span class="p">:</span>
<span class="n">transform_id</span> <span class="o">=</span> <span class="n">toggle</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">highlighted</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_id</span><span class="p">]</span>
<span class="k">if</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span><span class="p">:</span>
<span class="n">transform</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span>
<span class="k">if</span> <span class="n">transform</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span><span class="o">.</span><span class="n">remove</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span>
<span class="k">for</span> <span class="n">subtransform</span> <span class="ow">in</span> <span class="n">transform</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">subtransform</span><span class="p">)</span>
<span class="k">if</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">[</span><span class="n">subtransform</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">[</span>
<span class="n">transform_id</span><span class="p">]</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span></div>
<div class="viewcode-block" id="PipelineRenderer.style"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.style">[docs]</a> <span class="k">def</span> <span class="nf">style</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_id</span><span class="p">):</span>
<span class="n">base</span> <span class="o">=</span> <span class="s1">&#39; &#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span>
<span class="p">[</span><span class="n">DEFAULT_TRANSFORM_STYLE</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">render_node_attributes</span><span class="p">])</span>
<span class="k">if</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">highlighted</span><span class="p">:</span>
<span class="k">return</span> <span class="s1">&#39; &#39;</span><span class="o">.</span><span class="n">join</span><span class="p">([</span>
<span class="n">base</span><span class="p">,</span>
<span class="n">DEFAULT_HIGHLIGHT_STYLE</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">render_highlight_attributes</span>
<span class="p">])</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">base</span></div>
<div class="viewcode-block" id="PipelineRenderer.to_dot"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.to_dot">[docs]</a> <span class="k">def</span> <span class="nf">to_dot</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s1">&#39;</span><span class="se">\n</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">to_dot_iter</span><span class="p">())</span></div>
<div class="viewcode-block" id="PipelineRenderer.to_dot_iter"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.to_dot_iter">[docs]</a> <span class="k">def</span> <span class="nf">to_dot_iter</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">yield</span> <span class="s1">&#39;digraph G {&#39;</span>
<span class="c1"># Defer drawing any edges until the end lest we declare nodes too early.</span>
<span class="n">edges_out</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">roots</span><span class="p">:</span>
<span class="k">yield from</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform_to_dot</span><span class="p">(</span>
<span class="n">transform_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">pcoll_leaf_consumers</span><span class="p">(),</span> <span class="n">edges_out</span><span class="p">)</span>
<span class="k">yield from</span> <span class="n">edges_out</span>
<span class="k">yield</span> <span class="s1">&#39;}&#39;</span></div>
<div class="viewcode-block" id="PipelineRenderer.transform_to_dot"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.transform_to_dot">[docs]</a> <span class="k">def</span> <span class="nf">transform_to_dot</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_id</span><span class="p">,</span> <span class="n">pcoll_leaf_consumers</span><span class="p">,</span> <span class="n">edges_out</span><span class="p">):</span>
<span class="n">transform</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_leaf</span><span class="p">(</span><span class="n">transform_id</span><span class="p">):</span>
<span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform_node</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span>
<span class="n">transform_inputs</span> <span class="o">=</span> <span class="nb">set</span><span class="p">(</span><span class="n">transform</span><span class="o">.</span><span class="n">inputs</span><span class="o">.</span><span class="n">values</span><span class="p">())</span>
<span class="k">for</span> <span class="n">name</span><span class="p">,</span> <span class="n">output</span> <span class="ow">in</span> <span class="n">transform</span><span class="o">.</span><span class="n">outputs</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="c1"># For outputs that are also inputs, it&#39;s ambiguous whether they are</span>
<span class="c1"># consumed as the outputs of this transform, or of the upstream</span>
<span class="c1"># transform. Render the latter.</span>
<span class="k">if</span> <span class="n">output</span> <span class="ow">in</span> <span class="n">transform_inputs</span><span class="p">:</span>
<span class="k">continue</span>
<span class="n">output_label</span> <span class="o">=</span> <span class="n">name</span> <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">transform</span><span class="o">.</span><span class="n">outputs</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">1</span> <span class="k">else</span> <span class="s1">&#39;&#39;</span>
<span class="k">for</span> <span class="n">consumer</span><span class="p">,</span> <span class="n">is_side_input</span> <span class="ow">in</span> <span class="n">pcoll_leaf_consumers</span><span class="p">[</span><span class="n">output</span><span class="p">]:</span>
<span class="c1"># Can&#39;t yield this here as the consumer might not be in this cluster.</span>
<span class="n">edge_style</span> <span class="o">=</span> <span class="s1">&#39;dashed&#39;</span> <span class="k">if</span> <span class="n">is_side_input</span> <span class="k">else</span> <span class="s1">&#39;solid&#39;</span>
<span class="n">edge_attributes</span> <span class="o">=</span> <span class="s1">&#39; &#39;</span><span class="o">.</span><span class="n">join</span><span class="p">([</span>
<span class="sa">f</span><span class="s1">&#39;label=&quot;</span><span class="si">{</span><span class="n">output_label</span><span class="si">}</span><span class="s1">&quot; style=</span><span class="si">{</span><span class="n">edge_style</span><span class="si">}</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">DEFAULT_EDGE_STYLE</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">render_edge_attributes</span>
<span class="p">])</span>
<span class="n">edges_out</span><span class="o">.</span><span class="n">append</span><span class="p">(</span>
<span class="sa">f</span><span class="s1">&#39;&quot;</span><span class="si">{</span><span class="n">transform_id</span><span class="si">}</span><span class="s1">&quot; -&gt; &quot;</span><span class="si">{</span><span class="n">consumer</span><span class="si">}</span><span class="s1">&quot; [</span><span class="si">{</span><span class="n">edge_attributes</span><span class="si">}</span><span class="s1">]&#39;</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="sa">f</span><span class="s1">&#39;subgraph &quot;cluster_</span><span class="si">{</span><span class="n">transform_id</span><span class="si">}</span><span class="s1">&quot; </span><span class="se">{{</span><span class="s1">&#39;</span>
<span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform_attributes</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span>
<span class="k">for</span> <span class="n">subtransform</span> <span class="ow">in</span> <span class="n">transform</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">:</span>
<span class="k">yield from</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform_to_dot</span><span class="p">(</span>
<span class="n">subtransform</span><span class="p">,</span> <span class="n">pcoll_leaf_consumers</span><span class="p">,</span> <span class="n">edges_out</span><span class="p">)</span>
<span class="k">yield</span> <span class="s1">&#39;}&#39;</span></div>
<div class="viewcode-block" id="PipelineRenderer.transform_node"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.transform_node">[docs]</a> <span class="k">def</span> <span class="nf">transform_node</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_id</span><span class="p">):</span>
<span class="k">return</span> <span class="sa">f</span><span class="s1">&#39;&quot;</span><span class="si">{</span><span class="n">transform_id</span><span class="si">}</span><span class="s1">&quot; [</span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">transform_attributes</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span><span class="si">}</span><span class="s1">]&#39;</span></div>
<div class="viewcode-block" id="PipelineRenderer.transform_attributes"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.transform_attributes">[docs]</a> <span class="k">def</span> <span class="nf">transform_attributes</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_id</span><span class="p">):</span>
<span class="n">transform</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span>
<span class="n">local_name</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">unique_name</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;/&#39;</span><span class="p">)[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span>
<span class="k">if</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">:</span>
<span class="n">pos_str</span> <span class="o">=</span> <span class="sa">f</span><span class="s1">&#39;pos=&quot;</span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span><span class="si">}</span><span class="s1">&quot;&#39;</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">pos_str</span> <span class="o">=</span> <span class="s1">&#39;&#39;</span>
<span class="k">return</span> <span class="p">(</span>
<span class="sa">f</span><span class="s1">&#39;label=&quot;</span><span class="si">{</span><span class="n">local_name</span><span class="si">}</span><span class="s1">&quot; </span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">style</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span><span class="si">}</span><span class="s1"> &#39;</span>
<span class="sa">f</span><span class="s1">&#39;URL=&quot;javascript:click(</span><span class="se">\&#39;</span><span class="si">{</span><span class="n">transform_id</span><span class="si">}</span><span class="se">\&#39;</span><span class="s1">)&quot; </span><span class="si">{</span><span class="n">pos_str</span><span class="si">}</span><span class="s1">&#39;</span><span class="p">)</span></div>
<div class="viewcode-block" id="PipelineRenderer.pcoll_leaf_consumers_iter"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.pcoll_leaf_consumers_iter">[docs]</a> <span class="k">def</span> <span class="nf">pcoll_leaf_consumers_iter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_id</span><span class="p">):</span>
<span class="n">transform</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span>
<span class="n">transform_inputs</span> <span class="o">=</span> <span class="nb">set</span><span class="p">(</span><span class="n">transform</span><span class="o">.</span><span class="n">inputs</span><span class="o">.</span><span class="n">values</span><span class="p">())</span>
<span class="n">side_inputs</span> <span class="o">=</span> <span class="nb">set</span><span class="p">(</span><span class="n">translations</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">(</span><span class="n">transform</span><span class="p">)</span><span class="o">.</span><span class="n">values</span><span class="p">())</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_leaf</span><span class="p">(</span><span class="n">transform_id</span><span class="p">):</span>
<span class="k">for</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">transform</span><span class="o">.</span><span class="n">inputs</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="k">yield</span> <span class="n">pcoll</span><span class="p">,</span> <span class="p">(</span><span class="n">transform_id</span><span class="p">,</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">side_inputs</span><span class="p">)</span>
<span class="k">for</span> <span class="n">subtransform</span> <span class="ow">in</span> <span class="n">transform</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">:</span>
<span class="k">for</span> <span class="n">pcoll</span><span class="p">,</span> <span class="p">(</span><span class="n">consumer</span><span class="p">,</span>
<span class="n">annotation</span><span class="p">)</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pcoll_leaf_consumers_iter</span><span class="p">(</span><span class="n">subtransform</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_leaf</span><span class="p">(</span><span class="n">transform_id</span><span class="p">):</span>
<span class="k">if</span> <span class="n">pcoll</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">transform_inputs</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">pcoll</span><span class="p">,</span> <span class="p">(</span><span class="n">transform_id</span><span class="p">,</span> <span class="n">annotation</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">pcoll</span><span class="p">,</span> <span class="p">(</span><span class="n">consumer</span><span class="p">,</span> <span class="n">annotation</span><span class="p">)</span></div>
<div class="viewcode-block" id="PipelineRenderer.pcoll_leaf_consumers"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.pcoll_leaf_consumers">[docs]</a> <span class="k">def</span> <span class="nf">pcoll_leaf_consumers</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">collections</span><span class="o">.</span><span class="n">defaultdict</span><span class="p">(</span><span class="nb">list</span><span class="p">)</span>
<span class="k">for</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">roots</span><span class="p">:</span>
<span class="k">for</span> <span class="n">pcoll</span><span class="p">,</span> <span class="n">consumer_info</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pcoll_leaf_consumers_iter</span><span class="p">(</span><span class="n">transform_id</span><span class="p">):</span>
<span class="n">result</span><span class="p">[</span><span class="n">pcoll</span><span class="p">]</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">consumer_info</span><span class="p">)</span>
<span class="k">return</span> <span class="n">result</span></div>
<div class="viewcode-block" id="PipelineRenderer.is_leaf"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.is_leaf">[docs]</a> <span class="k">def</span> <span class="nf">is_leaf</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_id</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span> <span class="ow">or</span>
<span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">)</span></div>
<div class="viewcode-block" id="PipelineRenderer.info"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.info">[docs]</a> <span class="k">def</span> <span class="nf">info</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">highlighted</span><span class="p">)</span> <span class="o">!=</span> <span class="mi">1</span><span class="p">:</span>
<span class="k">return</span> <span class="s1">&#39;&#39;</span>
<span class="n">transform_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">highlighted</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">return</span> <span class="sa">f</span><span class="s1">&#39;&lt;pre&gt;</span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span><span class="si">}</span><span class="s1">&lt;/pre&gt;&#39;</span></div>
<div class="viewcode-block" id="PipelineRenderer.layout_dot"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.layout_dot">[docs]</a> <span class="k">def</span> <span class="nf">layout_dot</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">layout</span> <span class="o">=</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">run</span><span class="p">([</span><span class="s1">&#39;dot&#39;</span><span class="p">,</span> <span class="s1">&#39;-Tdot&#39;</span><span class="p">],</span>
<span class="nb">input</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">to_dot</span><span class="p">()</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">),</span>
<span class="n">capture_output</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">check</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span><span class="o">.</span><span class="n">stdout</span>
<span class="c1"># Try to capture the positions for layout consistency.</span>
<span class="n">json_out</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span>
<span class="n">subprocess</span><span class="o">.</span><span class="n">run</span><span class="p">([</span><span class="s1">&#39;dot&#39;</span><span class="p">,</span> <span class="s1">&#39;-n2&#39;</span><span class="p">,</span> <span class="s1">&#39;-Kneato&#39;</span><span class="p">,</span> <span class="s1">&#39;-Tjson&#39;</span><span class="p">],</span>
<span class="nb">input</span><span class="o">=</span><span class="n">layout</span><span class="p">,</span>
<span class="n">capture_output</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">check</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span><span class="o">.</span><span class="n">stdout</span><span class="p">)</span>
<span class="k">for</span> <span class="n">box</span> <span class="ow">in</span> <span class="n">json_out</span><span class="p">[</span><span class="s1">&#39;objects&#39;</span><span class="p">]:</span>
<span class="n">name</span> <span class="o">=</span> <span class="n">box</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;name&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">:</span>
<span class="k">if</span> <span class="s1">&#39;pos&#39;</span> <span class="ow">in</span> <span class="n">box</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">box</span><span class="p">[</span><span class="s1">&#39;pos&#39;</span><span class="p">]</span>
<span class="k">elif</span> <span class="s1">&#39;bb&#39;</span> <span class="ow">in</span> <span class="n">box</span><span class="p">:</span>
<span class="n">x0</span><span class="p">,</span> <span class="n">y0</span><span class="p">,</span> <span class="n">x1</span><span class="p">,</span> <span class="n">y1</span> <span class="o">=</span> <span class="p">[</span><span class="nb">float</span><span class="p">(</span><span class="n">r</span><span class="p">)</span> <span class="k">for</span> <span class="n">r</span> <span class="ow">in</span> <span class="n">box</span><span class="p">[</span><span class="s1">&#39;bb&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;,&#39;</span><span class="p">)]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="sa">f</span><span class="s1">&#39;</span><span class="si">{</span><span class="p">(</span><span class="n">x0</span><span class="o">+</span><span class="n">x1</span><span class="p">)</span><span class="o">/</span><span class="mi">2</span><span class="si">}</span><span class="s1">,</span><span class="si">{</span><span class="p">(</span><span class="n">y0</span><span class="o">+</span><span class="n">y1</span><span class="p">)</span><span class="o">/</span><span class="mi">2</span><span class="si">}</span><span class="s1">&#39;</span>
<span class="k">return</span> <span class="n">layout</span></div>
<div class="viewcode-block" id="PipelineRenderer.page_callback_data"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.page_callback_data">[docs]</a> <span class="k">def</span> <span class="nf">page_callback_data</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">layout</span><span class="p">):</span>
<span class="n">svg</span> <span class="o">=</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">run</span><span class="p">([</span><span class="s1">&#39;dot&#39;</span><span class="p">,</span> <span class="s1">&#39;-Kneato&#39;</span><span class="p">,</span> <span class="s1">&#39;-n2&#39;</span><span class="p">,</span> <span class="s1">&#39;-Tsvg&#39;</span><span class="p">],</span>
<span class="nb">input</span><span class="o">=</span><span class="n">layout</span><span class="p">,</span>
<span class="n">capture_output</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">check</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span><span class="o">.</span><span class="n">stdout</span>
<span class="n">cmapx</span> <span class="o">=</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">run</span><span class="p">([</span><span class="s1">&#39;dot&#39;</span><span class="p">,</span> <span class="s1">&#39;-Kneato&#39;</span><span class="p">,</span> <span class="s1">&#39;-n2&#39;</span><span class="p">,</span> <span class="s1">&#39;-Tcmapx&#39;</span><span class="p">],</span>
<span class="nb">input</span><span class="o">=</span><span class="n">layout</span><span class="p">,</span>
<span class="n">capture_output</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">check</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span><span class="o">.</span><span class="n">stdout</span>
<span class="k">return</span> <span class="p">{</span>
<span class="s1">&#39;src&#39;</span><span class="p">:</span> <span class="s1">&#39;data:image/svg+xml;base64,&#39;</span> <span class="o">+</span>
<span class="n">base64</span><span class="o">.</span><span class="n">b64encode</span><span class="p">(</span><span class="n">svg</span><span class="p">)</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">),</span>
<span class="s1">&#39;cmapx&#39;</span><span class="p">:</span> <span class="n">cmapx</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">),</span>
<span class="s1">&#39;info&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">info</span><span class="p">(),</span>
<span class="p">}</span></div>
<div class="viewcode-block" id="PipelineRenderer.render_data"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.render_data">[docs]</a> <span class="k">def</span> <span class="nf">render_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Re-rendering pipeline...&quot;</span><span class="p">)</span>
<span class="n">layout</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">layout_dot</span><span class="p">()</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">render_output</span><span class="p">:</span>
<span class="k">for</span> <span class="n">path</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">render_output</span><span class="p">:</span>
<span class="nb">format</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">splitext</span><span class="p">(</span><span class="n">path</span><span class="p">)[</span><span class="o">-</span><span class="mi">1</span><span class="p">][</span><span class="mi">1</span><span class="p">:]</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">run</span><span class="p">(</span>
<span class="p">[</span><span class="s1">&#39;dot&#39;</span><span class="p">,</span> <span class="s1">&#39;-Kneato&#39;</span><span class="p">,</span> <span class="s1">&#39;-n2&#39;</span><span class="p">,</span> <span class="s1">&#39;-T&#39;</span> <span class="o">+</span> <span class="nb">format</span><span class="p">,</span> <span class="s1">&#39;-o&#39;</span><span class="p">,</span> <span class="n">path</span><span class="p">],</span>
<span class="nb">input</span><span class="o">=</span><span class="n">layout</span><span class="p">,</span>
<span class="n">check</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span>
<span class="k">if</span> <span class="n">result</span><span class="o">.</span><span class="n">returncode</span><span class="p">:</span>
<span class="n">logging</span><span class="o">.</span><span class="n">error</span><span class="p">(</span>
<span class="s2">&quot;Failed render pipeline as </span><span class="si">%r</span><span class="s2">: exit </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">result</span><span class="o">.</span><span class="n">returncode</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Rendered pipeline as </span><span class="si">%r</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">path</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">page_callback_data</span><span class="p">(</span><span class="n">layout</span><span class="p">)</span></div>
<div class="viewcode-block" id="PipelineRenderer.render_json"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.render_json">[docs]</a> <span class="k">def</span> <span class="nf">render_json</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">render_data</span><span class="p">())</span></div>
<div class="viewcode-block" id="PipelineRenderer.page"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.page">[docs]</a> <span class="k">def</span> <span class="nf">page</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">data</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">render_data</span><span class="p">()</span>
<span class="n">src</span> <span class="o">=</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;src&#39;</span><span class="p">]</span>
<span class="n">cmapx</span> <span class="o">=</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;cmapx&#39;</span><span class="p">]</span>
<span class="k">return</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> &lt;html&gt;</span>
<span class="s2"> &lt;head&gt;</span>
<span class="s2"> &lt;script&gt;</span>
<span class="s2"> function click(transform_id) {</span>
<span class="s2"> var xhttp = new XMLHttpRequest();</span>
<span class="s2"> xhttp.onreadystatechange = function() {</span>
<span class="s2"> render_data = JSON.parse(this.responseText);</span>
<span class="s2"> document.getElementById(&#39;image_map_holder&#39;).innerHTML =</span>
<span class="s2"> render_data.cmapx;</span>
<span class="s2"> document.getElementById(&#39;image_tag&#39;).src = render_data.src</span>
<span class="s2"> document.getElementById(&#39;info&#39;).innerHTML = render_data.info</span>
<span class="s2"> };</span>
<span class="s2"> xhttp.open(&quot;GET&quot;, &quot;render?toggle=&quot; + transform_id, true);</span>
<span class="s2"> xhttp.send();</span>
<span class="s2"> }</span>
<span class="s2"> &lt;/script&gt;</span>
<span class="s2"> &lt;/head&gt;</span>
<span class="s2"> &quot;&quot;&quot;</span> <span class="o">+</span> <span class="sa">f</span><span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> &lt;body&gt;</span>
<span class="s2"> Click on a composite transform to expand.</span>
<span class="s2"> &lt;br&gt;</span>
<span class="s2"> &lt;img id=&#39;image_tag&#39; src=&#39;</span><span class="si">{</span><span class="n">src</span><span class="si">}</span><span class="s2">&#39; usemap=&#39;#G&#39;&gt;</span>
<span class="s2"> &lt;hr&gt;</span>
<span class="s2"> &lt;div id=&#39;info&#39;&gt;&lt;/div&gt;</span>
<span class="s2"> &lt;div id=&#39;image_map_holder&#39;&gt;</span>
<span class="s2"> </span><span class="si">{</span><span class="n">cmapx</span><span class="si">}</span>
<span class="s2"> &lt;/div&gt;</span>
<span class="s2"> &lt;/body&gt;</span>
<span class="s2"> &lt;/html&gt;</span>
<span class="s2"> &quot;&quot;&quot;</span></div></div>
<div class="viewcode-block" id="RenderRunner"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.RenderRunner">[docs]</a><span class="k">class</span> <span class="nc">RenderRunner</span><span class="p">(</span><span class="n">runner</span><span class="o">.</span><span class="n">PipelineRunner</span><span class="p">):</span>
<span class="c1"># TODO(robertwb): Consider making this a runner wrapper, where live status</span>
<span class="c1"># (such as counters, stage completion status, or possibly even PCollection</span>
<span class="c1"># samples) queryable and/or displayed. This could evolve into a full Beam</span>
<span class="c1"># UI.</span>
<div class="viewcode-block" id="RenderRunner.run_pipeline"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.RenderRunner.run_pipeline">[docs]</a> <span class="k">def</span> <span class="nf">run_pipeline</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pipeline_object</span><span class="p">,</span> <span class="n">options</span><span class="p">,</span> <span class="n">pipeline_proto</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">pipeline_proto</span><span class="p">:</span>
<span class="n">pipeline_proto</span> <span class="o">=</span> <span class="n">pipeline_object</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">()</span>
<span class="n">render_options</span> <span class="o">=</span> <span class="n">options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">RenderOptions</span><span class="p">)</span>
<span class="k">if</span> <span class="n">render_options</span><span class="o">.</span><span class="n">log_proto</span><span class="p">:</span>
<span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="n">pipeline_proto</span><span class="p">)</span>
<span class="n">renderer</span> <span class="o">=</span> <span class="n">PipelineRenderer</span><span class="p">(</span><span class="n">pipeline_proto</span><span class="p">,</span> <span class="n">render_options</span><span class="p">)</span>
<span class="n">renderer</span><span class="o">.</span><span class="n">page</span><span class="p">()</span>
<span class="k">if</span> <span class="n">render_options</span><span class="o">.</span><span class="n">render_port</span> <span class="o">&gt;=</span> <span class="mi">0</span><span class="p">:</span>
<span class="c1"># TODO: If this gets more complex, we could consider taking on a</span>
<span class="c1"># framework like Flask as a dependency.</span>
<span class="k">class</span> <span class="nc">RequestHandler</span><span class="p">(</span><span class="n">http</span><span class="o">.</span><span class="n">server</span><span class="o">.</span><span class="n">BaseHTTPRequestHandler</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">do_GET</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">parts</span> <span class="o">=</span> <span class="n">urllib</span><span class="o">.</span><span class="n">parse</span><span class="o">.</span><span class="n">urlparse</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="p">)</span>
<span class="n">args</span> <span class="o">=</span> <span class="n">urllib</span><span class="o">.</span><span class="n">parse</span><span class="o">.</span><span class="n">parse_qs</span><span class="p">(</span><span class="n">parts</span><span class="o">.</span><span class="n">query</span><span class="p">)</span>
<span class="n">renderer</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="o">**</span><span class="n">args</span><span class="p">)</span>
<span class="k">if</span> <span class="n">parts</span><span class="o">.</span><span class="n">path</span> <span class="o">==</span> <span class="s1">&#39;/&#39;</span><span class="p">:</span>
<span class="n">response</span> <span class="o">=</span> <span class="n">renderer</span><span class="o">.</span><span class="n">page</span><span class="p">()</span>
<span class="k">elif</span> <span class="n">parts</span><span class="o">.</span><span class="n">path</span> <span class="o">==</span> <span class="s1">&#39;/render&#39;</span><span class="p">:</span>
<span class="n">response</span> <span class="o">=</span> <span class="n">renderer</span><span class="o">.</span><span class="n">render_json</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">send_response</span><span class="p">(</span><span class="mi">400</span><span class="p">)</span>
<span class="k">return</span>
<span class="bp">self</span><span class="o">.</span><span class="n">send_response</span><span class="p">(</span><span class="mi">200</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">send_header</span><span class="p">(</span><span class="s2">&quot;Content-type&quot;</span><span class="p">,</span> <span class="s2">&quot;text/html&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">end_headers</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">wfile</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">response</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">))</span>
<span class="n">server</span> <span class="o">=</span> <span class="n">http</span><span class="o">.</span><span class="n">server</span><span class="o">.</span><span class="n">HTTPServer</span><span class="p">((</span><span class="s1">&#39;localhost&#39;</span><span class="p">,</span> <span class="n">render_options</span><span class="o">.</span><span class="n">render_port</span><span class="p">),</span>
<span class="n">RequestHandler</span><span class="p">)</span>
<span class="n">server_thread</span> <span class="o">=</span> <span class="n">threading</span><span class="o">.</span><span class="n">Thread</span><span class="p">(</span><span class="n">target</span><span class="o">=</span><span class="n">server</span><span class="o">.</span><span class="n">serve_forever</span><span class="p">,</span> <span class="n">daemon</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="n">server_thread</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
<span class="nb">print</span><span class="p">(</span><span class="s1">&#39;Serving at http://</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">server</span><span class="o">.</span><span class="n">server_address</span><span class="p">)</span>
<span class="k">return</span> <span class="n">RenderPipelineResult</span><span class="p">(</span><span class="n">server</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">RenderPipelineResult</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span></div></div>
<div class="viewcode-block" id="RenderPipelineResult"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.RenderPipelineResult">[docs]</a><span class="k">class</span> <span class="nc">RenderPipelineResult</span><span class="p">(</span><span class="n">runner</span><span class="o">.</span><span class="n">PipelineResult</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">server</span><span class="p">):</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">runner</span><span class="o">.</span><span class="n">PipelineState</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">server</span> <span class="o">=</span> <span class="n">server</span>
<div class="viewcode-block" id="RenderPipelineResult.wait_until_finish"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.RenderPipelineResult.wait_until_finish">[docs]</a> <span class="k">def</span> <span class="nf">wait_until_finish</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">duration</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">server</span><span class="p">:</span>
<span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="n">duration</span> <span class="ow">or</span> <span class="mf">1e8</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">server</span><span class="o">.</span><span class="n">shutdown</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_state</span> <span class="o">=</span> <span class="n">runner</span><span class="o">.</span><span class="n">PipelineState</span><span class="o">.</span><span class="n">DONE</span></div>
<div class="viewcode-block" id="RenderPipelineResult.monitoring_infos"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.RenderPipelineResult.monitoring_infos">[docs]</a> <span class="k">def</span> <span class="nf">monitoring_infos</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">[]</span></div></div>
<div class="viewcode-block" id="run"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.run">[docs]</a><span class="k">def</span> <span class="nf">run</span><span class="p">(</span><span class="n">argv</span><span class="p">):</span>
<span class="k">if</span> <span class="n">argv</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="o">==</span> <span class="vm">__file__</span><span class="p">:</span>
<span class="n">argv</span> <span class="o">=</span> <span class="n">argv</span><span class="p">[</span><span class="mi">1</span><span class="p">:]</span>
<span class="n">parser</span> <span class="o">=</span> <span class="n">argparse</span><span class="o">.</span><span class="n">ArgumentParser</span><span class="p">(</span>
<span class="n">description</span><span class="o">=</span><span class="vm">__doc__</span><span class="p">,</span> <span class="n">formatter_class</span><span class="o">=</span><span class="n">argparse</span><span class="o">.</span><span class="n">RawDescriptionHelpFormatter</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--job_port&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;port on which to serve the job api&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--pipeline_proto&#39;</span><span class="p">,</span> <span class="n">help</span><span class="o">=</span><span class="s1">&#39;file containing the beam pipeline definition&#39;</span><span class="p">)</span>
<span class="n">RenderOptions</span><span class="o">.</span><span class="n">_add_argparse_args</span><span class="p">(</span><span class="n">parser</span><span class="p">)</span>
<span class="n">options</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">(</span><span class="n">argv</span><span class="p">)</span>
<span class="k">if</span> <span class="n">options</span><span class="o">.</span><span class="n">pipeline_proto</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">options</span><span class="o">.</span><span class="n">render_output</span> <span class="ow">and</span> <span class="n">options</span><span class="o">.</span><span class="n">render_port</span> <span class="o">&lt;</span> <span class="mi">0</span><span class="p">:</span>
<span class="n">options</span><span class="o">.</span><span class="n">render_port</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">render_one</span><span class="p">(</span><span class="n">options</span><span class="p">)</span>
<span class="k">if</span> <span class="n">options</span><span class="o">.</span><span class="n">render_output</span><span class="p">:</span>
<span class="k">return</span>
<span class="n">run_server</span><span class="p">(</span><span class="n">options</span><span class="p">)</span></div>
<div class="viewcode-block" id="render_one"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.render_one">[docs]</a><span class="k">def</span> <span class="nf">render_one</span><span class="p">(</span><span class="n">options</span><span class="p">):</span>
<span class="k">if</span> <span class="n">options</span><span class="o">.</span><span class="n">pipeline_proto</span> <span class="o">==</span> <span class="s1">&#39;-&#39;</span><span class="p">:</span>
<span class="n">content</span> <span class="o">=</span> <span class="n">sys</span><span class="o">.</span><span class="n">stdin</span><span class="o">.</span><span class="n">buffer</span><span class="o">.</span><span class="n">read</span><span class="p">()</span>
<span class="k">if</span> <span class="n">content</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="o">==</span> <span class="sa">b</span><span class="s1">&#39;{&#39;</span><span class="p">:</span>
<span class="n">ext</span> <span class="o">=</span> <span class="s1">&#39;.json&#39;</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">content</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">)</span>
<span class="n">ext</span> <span class="o">=</span> <span class="s1">&#39;.textproto&#39;</span>
<span class="k">except</span> <span class="ne">UnicodeDecodeError</span><span class="p">:</span>
<span class="n">ext</span> <span class="o">=</span> <span class="s1">&#39;.pb&#39;</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">if</span> <span class="n">options</span><span class="o">.</span><span class="n">pipeline_proto</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;gs://&#39;</span><span class="p">):</span>
<span class="k">if</span> <span class="n">gcsio</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ImportError</span><span class="p">(</span><span class="s1">&#39;GCS not available; please install apache_beam[gcp]&#39;</span><span class="p">)</span>
<span class="n">open_fn</span> <span class="o">=</span> <span class="n">gcsio</span><span class="o">.</span><span class="n">GcsIO</span><span class="p">()</span><span class="o">.</span><span class="n">open</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">open_fn</span> <span class="o">=</span> <span class="nb">open</span>
<span class="k">with</span> <span class="n">open_fn</span><span class="p">(</span><span class="n">options</span><span class="o">.</span><span class="n">pipeline_proto</span><span class="p">,</span> <span class="s1">&#39;rb&#39;</span><span class="p">)</span> <span class="k">as</span> <span class="n">fin</span><span class="p">:</span>
<span class="n">content</span> <span class="o">=</span> <span class="n">fin</span><span class="o">.</span><span class="n">read</span><span class="p">()</span>
<span class="n">ext</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">splitext</span><span class="p">(</span><span class="n">options</span><span class="o">.</span><span class="n">pipeline_proto</span><span class="p">)[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span>
<span class="k">if</span> <span class="n">ext</span> <span class="o">==</span> <span class="s1">&#39;.textproto&#39;</span><span class="p">:</span>
<span class="n">pipeline_proto</span> <span class="o">=</span> <span class="n">text_format</span><span class="o">.</span><span class="n">Parse</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">())</span>
<span class="k">elif</span> <span class="n">ext</span> <span class="o">==</span> <span class="s1">&#39;.json&#39;</span><span class="p">:</span>
<span class="n">pipeline_proto</span> <span class="o">=</span> <span class="n">json_format</span><span class="o">.</span><span class="n">Parse</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">())</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">pipeline_proto</span> <span class="o">=</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">()</span>
<span class="n">pipeline_proto</span><span class="o">.</span><span class="n">ParseFromString</span><span class="p">(</span><span class="n">content</span><span class="p">)</span>
<span class="n">RenderRunner</span><span class="p">()</span><span class="o">.</span><span class="n">run_pipeline</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span> <span class="n">pipeline_options</span><span class="o">.</span><span class="n">PipelineOptions</span><span class="p">(</span><span class="o">**</span><span class="nb">vars</span><span class="p">(</span><span class="n">options</span><span class="p">)),</span> <span class="n">pipeline_proto</span><span class="p">)</span></div>
<div class="viewcode-block" id="run_server"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.run_server">[docs]</a><span class="k">def</span> <span class="nf">run_server</span><span class="p">(</span><span class="n">options</span><span class="p">):</span>
<span class="k">class</span> <span class="nc">RenderBeamJob</span><span class="p">(</span><span class="n">local_job_service</span><span class="o">.</span><span class="n">BeamJob</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">_invoke_runner</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="n">RenderRunner</span><span class="p">()</span><span class="o">.</span><span class="n">run_pipeline</span><span class="p">(</span>
<span class="kc">None</span><span class="p">,</span>
<span class="n">pipeline_options</span><span class="o">.</span><span class="n">PipelineOptions</span><span class="p">(</span><span class="o">**</span><span class="nb">vars</span><span class="p">(</span><span class="n">options</span><span class="p">)),</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pipeline_proto</span><span class="p">)</span>
<span class="k">with</span> <span class="n">tempfile</span><span class="o">.</span><span class="n">TemporaryDirectory</span><span class="p">()</span> <span class="k">as</span> <span class="n">staging_dir</span><span class="p">:</span>
<span class="n">job_servicer</span> <span class="o">=</span> <span class="n">local_job_service</span><span class="o">.</span><span class="n">LocalJobServicer</span><span class="p">(</span>
<span class="n">staging_dir</span><span class="p">,</span> <span class="n">beam_job_type</span><span class="o">=</span><span class="n">RenderBeamJob</span><span class="p">)</span>
<span class="n">port</span> <span class="o">=</span> <span class="n">job_servicer</span><span class="o">.</span><span class="n">start_grpc_server</span><span class="p">(</span><span class="n">options</span><span class="o">.</span><span class="n">job_port</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">local_job_service_main</span><span class="o">.</span><span class="n">serve</span><span class="p">(</span>
<span class="s2">&quot;Listening for beam jobs on port </span><span class="si">%d</span><span class="s2">.&quot;</span> <span class="o">%</span> <span class="n">port</span><span class="p">,</span> <span class="n">job_servicer</span><span class="p">)</span>
<span class="k">finally</span><span class="p">:</span>
<span class="n">job_servicer</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span></div>
<span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s1">&#39;__main__&#39;</span><span class="p">:</span>
<span class="n">logging</span><span class="o">.</span><span class="n">basicConfig</span><span class="p">()</span>
<span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">()</span><span class="o">.</span><span class="n">setLevel</span><span class="p">(</span><span class="n">logging</span><span class="o">.</span><span class="n">INFO</span><span class="p">)</span>
<span class="n">run</span><span class="p">(</span><span class="n">sys</span><span class="o">.</span><span class="n">argv</span><span class="p">)</span>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>