| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>apache_beam.runners.render — Apache Beam 2.47.0 documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../_static/language_data.js"></script> |
| <script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| |
| <script type="text/javascript" src="../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../search.html" /> |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../index.html" class="icon icon-home"> Apache Beam |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 2.47.0 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../index.html">Apache Beam</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../index.html">Module code</a> »</li> |
| |
| <li>apache_beam.runners.render</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.runners.render</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="sd">"""A portable "runner" that renders a beam graph.</span> |
| |
| <span class="sd">This runner can either render the graph to a (set of) output path(s), as</span> |
| <span class="sd">designated by (possibly repeated) --render_output, or serve the pipeline as</span> |
| <span class="sd">an interactive graph, if --render_port is set.</span> |
| |
| <span class="sd">In Python, this runner can be passed directly at pipeline construction, e.g.::</span> |
| |
| <span class="sd"> with beam.Pipeline(runner=beam.runners.render.RenderRunner(), options=...)</span> |
| |
| <span class="sd">For other languages, start this service a by running::</span> |
| |
| <span class="sd"> python -m apache_beam.runners.render --job_port=PORT ...</span> |
| |
| <span class="sd">and then run your pipline with the PortableRunner setting the job endpoint</span> |
| <span class="sd">to `localhost:PORT`.</span> |
| |
| <span class="sd">If any `--render_output=path.ext` flags are passed, each submitted job will</span> |
| <span class="sd">get written to the given output (overwriting any previously existing file).</span> |
| |
| <span class="sd">If `--render_port` is set to a non-negative value, a local http server will</span> |
| <span class="sd">be started which allows for interactive exploration of the pipeline graph.</span> |
| |
| <span class="sd">As an alternative to starting a job server, a single pipeline can be rendered</span> |
| <span class="sd">by passing a pipeline proto file to `--pipeline_proto`. For example</span> |
| |
| <span class="sd"> python -m apache_beam.runners.render \\</span> |
| <span class="sd"> --pipeline_proto gs://<staging_location>/pipeline.pb \\</span> |
| <span class="sd"> --render_output=/tmp/pipeline.svg</span> |
| |
| <span class="sd">Requires the graphviz dot executable to be available in the path.</span> |
| <span class="sd">"""</span> |
| |
| <span class="kn">import</span> <span class="nn">argparse</span> |
| <span class="kn">import</span> <span class="nn">base64</span> |
| <span class="kn">import</span> <span class="nn">collections</span> |
| <span class="kn">import</span> <span class="nn">http.server</span> |
| <span class="kn">import</span> <span class="nn">json</span> |
| <span class="kn">import</span> <span class="nn">logging</span> |
| <span class="kn">import</span> <span class="nn">os</span> |
| <span class="kn">import</span> <span class="nn">re</span> |
| <span class="kn">import</span> <span class="nn">subprocess</span> |
| <span class="kn">import</span> <span class="nn">sys</span> |
| <span class="kn">import</span> <span class="nn">tempfile</span> |
| <span class="kn">import</span> <span class="nn">threading</span> |
| <span class="kn">import</span> <span class="nn">time</span> |
| <span class="kn">import</span> <span class="nn">urllib.parse</span> |
| |
| <span class="kn">from</span> <span class="nn">google.protobuf</span> <span class="kn">import</span> <span class="n">json_format</span> |
| <span class="kn">from</span> <span class="nn">google.protobuf</span> <span class="kn">import</span> <span class="n">text_format</span> <span class="c1"># type: ignore</span> |
| |
| <span class="kn">from</span> <span class="nn">apache_beam.options</span> <span class="kn">import</span> <span class="n">pipeline_options</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.portability.api</span> <span class="kn">import</span> <span class="n">beam_runner_api_pb2</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners</span> <span class="kn">import</span> <span class="n">runner</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.portability</span> <span class="kn">import</span> <span class="n">local_job_service</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.portability</span> <span class="kn">import</span> <span class="n">local_job_service_main</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.portability.fn_api_runner</span> <span class="kn">import</span> <span class="n">translations</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp</span> <span class="kn">import</span> <span class="n">gcsio</span> |
| <span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span> |
| <span class="n">gcsio</span> <span class="o">=</span> <span class="kc">None</span> <span class="c1"># type: ignore</span> |
| |
| <span class="c1"># From the Beam site, circa November 2022.</span> |
| <span class="n">DEFAULT_EDGE_STYLE</span> <span class="o">=</span> <span class="s1">'color="#ff570b"'</span> |
| <span class="n">DEFAULT_TRANSFORM_STYLE</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="s1">'shape=rect style="rounded, filled" color="#ff570b" fillcolor="#fff6dd"'</span><span class="p">)</span> |
| <span class="n">DEFAULT_HIGHLIGHT_STYLE</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="s1">'shape=rect style="rounded, filled" color="#ff570b" fillcolor="#ffdb97"'</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="RenderOptions"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.RenderOptions">[docs]</a><span class="k">class</span> <span class="nc">RenderOptions</span><span class="p">(</span><span class="n">pipeline_options</span><span class="o">.</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Rendering options."""</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--render_port'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=-</span><span class="mi">1</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The port at which to serve the graph. '</span> |
| <span class="s1">'If 0, an unused port will be chosen. '</span> |
| <span class="s1">'If -1, the server will not be started.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--render_output'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'append'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'A path or paths to which to write rendered output. '</span> |
| <span class="s1">'The output type will be deduced from the file extension.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--render_leaf_composite_nodes'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'append'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'A set of regular expressions for transform names that should '</span> |
| <span class="s1">'not be expanded. For example, one could pass "</span><span class="se">\b</span><span class="s1">Read.*" to indicate '</span> |
| <span class="s1">'the inner structure of read nodes should not be expanded. '</span> |
| <span class="s1">'If not given, defaults to the top-level nodes if interactively '</span> |
| <span class="s1">'serving the graph and expanding all nodes otherwise.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--render_edge_attributes'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="s1">''</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Graphviz attributes to add to all edges.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--render_node_attributes'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="s1">''</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Graphviz attributes to add to all nodes.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--render_highlight_attributes'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="s1">''</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Graphviz attributes to add to all highlighted nodes.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--log_proto'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Set to also log input pipeline proto to stdout.'</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">parser</span></div> |
| |
| |
| <div class="viewcode-block" id="PipelineRenderer"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer">[docs]</a><span class="k">class</span> <span class="nc">PipelineRenderer</span><span class="p">:</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pipeline</span><span class="p">,</span> <span class="n">options</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">=</span> <span class="n">pipeline</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">options</span> <span class="o">=</span> <span class="n">options</span> |
| |
| <span class="c1"># Drill down into any uninteresting, top-level transforms that contain</span> |
| <span class="c1"># the whole pipeline (often added by the SDK).</span> |
| <span class="n">roots</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">root_transform_ids</span> |
| <span class="k">while</span> <span class="nb">len</span><span class="p">(</span><span class="n">roots</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span> |
| <span class="n">root</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">roots</span><span class="p">[</span><span class="mi">0</span><span class="p">]]</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">root</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">:</span> |
| <span class="k">break</span> |
| <span class="n">roots</span> <span class="o">=</span> <span class="n">root</span><span class="o">.</span><span class="n">subtransforms</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">roots</span> <span class="o">=</span> <span class="n">roots</span> |
| |
| <span class="c1"># Figure out at what point to stop rendering composite internals.</span> |
| <span class="k">if</span> <span class="n">options</span><span class="o">.</span><span class="n">render_leaf_composite_nodes</span><span class="p">:</span> |
| <span class="n">is_leaf</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">name</span><span class="p">:</span> <span class="nb">any</span><span class="p">(</span> |
| <span class="n">re</span><span class="o">.</span><span class="n">match</span><span class="p">(</span><span class="n">pattern</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">patterns</span> <span class="ow">in</span> <span class="n">options</span><span class="o">.</span><span class="n">render_leaf_composite_nodes</span> |
| <span class="k">for</span> <span class="n">pattern</span> <span class="ow">in</span> <span class="n">patterns</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">','</span><span class="p">))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">mark_leaves</span><span class="p">(</span><span class="n">transform_ids</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="n">transform_ids</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">is_leaf</span><span class="p">(</span><span class="n">transform_id</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">mark_leaves</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">)</span> |
| |
| <span class="k">elif</span> <span class="n">options</span><span class="o">.</span><span class="n">render_port</span> <span class="o">>=</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="c1"># Start interactive with no unfolding.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span> <span class="o">=</span> <span class="nb">set</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">roots</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># For non-interactive, expand fully.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span> |
| |
| <span class="c1"># Useful for attempting graph layout consistency.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">highlighted</span> <span class="o">=</span> <span class="p">[]</span> |
| |
| <div class="viewcode-block" id="PipelineRenderer.update"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.update">[docs]</a> <span class="k">def</span> <span class="nf">update</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">toggle</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">toggle</span><span class="p">:</span> |
| <span class="n">transform_id</span> <span class="o">=</span> <span class="n">toggle</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">highlighted</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_id</span><span class="p">]</span> |
| <span class="k">if</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span><span class="p">:</span> |
| <span class="n">transform</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span> |
| <span class="k">if</span> <span class="n">transform</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span><span class="o">.</span><span class="n">remove</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">subtransform</span> <span class="ow">in</span> <span class="n">transform</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">subtransform</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">[</span><span class="n">subtransform</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">[</span> |
| <span class="n">transform_id</span><span class="p">]</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.style"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.style">[docs]</a> <span class="k">def</span> <span class="nf">style</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_id</span><span class="p">):</span> |
| <span class="n">base</span> <span class="o">=</span> <span class="s1">' '</span><span class="o">.</span><span class="n">join</span><span class="p">(</span> |
| <span class="p">[</span><span class="n">DEFAULT_TRANSFORM_STYLE</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">render_node_attributes</span><span class="p">])</span> |
| <span class="k">if</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">highlighted</span><span class="p">:</span> |
| <span class="k">return</span> <span class="s1">' '</span><span class="o">.</span><span class="n">join</span><span class="p">([</span> |
| <span class="n">base</span><span class="p">,</span> |
| <span class="n">DEFAULT_HIGHLIGHT_STYLE</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">render_highlight_attributes</span> |
| <span class="p">])</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">base</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.to_dot"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.to_dot">[docs]</a> <span class="k">def</span> <span class="nf">to_dot</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="s1">'</span><span class="se">\n</span><span class="s1">'</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">to_dot_iter</span><span class="p">())</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.to_dot_iter"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.to_dot_iter">[docs]</a> <span class="k">def</span> <span class="nf">to_dot_iter</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">yield</span> <span class="s1">'digraph G {'</span> |
| <span class="c1"># Defer drawing any edges until the end lest we declare nodes too early.</span> |
| <span class="n">edges_out</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">for</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">roots</span><span class="p">:</span> |
| <span class="k">yield from</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform_to_dot</span><span class="p">(</span> |
| <span class="n">transform_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">pcoll_leaf_consumers</span><span class="p">(),</span> <span class="n">edges_out</span><span class="p">)</span> |
| <span class="k">yield from</span> <span class="n">edges_out</span> |
| <span class="k">yield</span> <span class="s1">'}'</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.transform_to_dot"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.transform_to_dot">[docs]</a> <span class="k">def</span> <span class="nf">transform_to_dot</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_id</span><span class="p">,</span> <span class="n">pcoll_leaf_consumers</span><span class="p">,</span> <span class="n">edges_out</span><span class="p">):</span> |
| <span class="n">transform</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_leaf</span><span class="p">(</span><span class="n">transform_id</span><span class="p">):</span> |
| <span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform_node</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span> |
| <span class="n">transform_inputs</span> <span class="o">=</span> <span class="nb">set</span><span class="p">(</span><span class="n">transform</span><span class="o">.</span><span class="n">inputs</span><span class="o">.</span><span class="n">values</span><span class="p">())</span> |
| <span class="k">for</span> <span class="n">name</span><span class="p">,</span> <span class="n">output</span> <span class="ow">in</span> <span class="n">transform</span><span class="o">.</span><span class="n">outputs</span><span class="o">.</span><span class="n">items</span><span class="p">():</span> |
| <span class="c1"># For outputs that are also inputs, it's ambiguous whether they are</span> |
| <span class="c1"># consumed as the outputs of this transform, or of the upstream</span> |
| <span class="c1"># transform. Render the latter.</span> |
| <span class="k">if</span> <span class="n">output</span> <span class="ow">in</span> <span class="n">transform_inputs</span><span class="p">:</span> |
| <span class="k">continue</span> |
| <span class="n">output_label</span> <span class="o">=</span> <span class="n">name</span> <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">transform</span><span class="o">.</span><span class="n">outputs</span><span class="p">)</span> <span class="o">></span> <span class="mi">1</span> <span class="k">else</span> <span class="s1">''</span> |
| <span class="k">for</span> <span class="n">consumer</span><span class="p">,</span> <span class="n">is_side_input</span> <span class="ow">in</span> <span class="n">pcoll_leaf_consumers</span><span class="p">[</span><span class="n">output</span><span class="p">]:</span> |
| <span class="c1"># Can't yield this here as the consumer might not be in this cluster.</span> |
| <span class="n">edge_style</span> <span class="o">=</span> <span class="s1">'dashed'</span> <span class="k">if</span> <span class="n">is_side_input</span> <span class="k">else</span> <span class="s1">'solid'</span> |
| <span class="n">edge_attributes</span> <span class="o">=</span> <span class="s1">' '</span><span class="o">.</span><span class="n">join</span><span class="p">([</span> |
| <span class="sa">f</span><span class="s1">'label="</span><span class="si">{</span><span class="n">output_label</span><span class="si">}</span><span class="s1">" style=</span><span class="si">{</span><span class="n">edge_style</span><span class="si">}</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">DEFAULT_EDGE_STYLE</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">render_edge_attributes</span> |
| <span class="p">])</span> |
| <span class="n">edges_out</span><span class="o">.</span><span class="n">append</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s1">'"</span><span class="si">{</span><span class="n">transform_id</span><span class="si">}</span><span class="s1">" -> "</span><span class="si">{</span><span class="n">consumer</span><span class="si">}</span><span class="s1">" [</span><span class="si">{</span><span class="n">edge_attributes</span><span class="si">}</span><span class="s1">]'</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="sa">f</span><span class="s1">'subgraph "cluster_</span><span class="si">{</span><span class="n">transform_id</span><span class="si">}</span><span class="s1">" </span><span class="se">{{</span><span class="s1">'</span> |
| <span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform_attributes</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">subtransform</span> <span class="ow">in</span> <span class="n">transform</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">:</span> |
| <span class="k">yield from</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform_to_dot</span><span class="p">(</span> |
| <span class="n">subtransform</span><span class="p">,</span> <span class="n">pcoll_leaf_consumers</span><span class="p">,</span> <span class="n">edges_out</span><span class="p">)</span> |
| <span class="k">yield</span> <span class="s1">'}'</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.transform_node"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.transform_node">[docs]</a> <span class="k">def</span> <span class="nf">transform_node</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_id</span><span class="p">):</span> |
| <span class="k">return</span> <span class="sa">f</span><span class="s1">'"</span><span class="si">{</span><span class="n">transform_id</span><span class="si">}</span><span class="s1">" [</span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">transform_attributes</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span><span class="si">}</span><span class="s1">]'</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.transform_attributes"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.transform_attributes">[docs]</a> <span class="k">def</span> <span class="nf">transform_attributes</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_id</span><span class="p">):</span> |
| <span class="n">transform</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span> |
| <span class="n">local_name</span> <span class="o">=</span> <span class="n">transform</span><span class="o">.</span><span class="n">unique_name</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'/'</span><span class="p">)[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span> |
| <span class="k">if</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">:</span> |
| <span class="n">pos_str</span> <span class="o">=</span> <span class="sa">f</span><span class="s1">'pos="</span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span><span class="si">}</span><span class="s1">"'</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">pos_str</span> <span class="o">=</span> <span class="s1">''</span> |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="sa">f</span><span class="s1">'label="</span><span class="si">{</span><span class="n">local_name</span><span class="si">}</span><span class="s1">" </span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">style</span><span class="p">(</span><span class="n">transform_id</span><span class="p">)</span><span class="si">}</span><span class="s1"> '</span> |
| <span class="sa">f</span><span class="s1">'URL="javascript:click(</span><span class="se">\'</span><span class="si">{</span><span class="n">transform_id</span><span class="si">}</span><span class="se">\'</span><span class="s1">)" </span><span class="si">{</span><span class="n">pos_str</span><span class="si">}</span><span class="s1">'</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.pcoll_leaf_consumers_iter"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.pcoll_leaf_consumers_iter">[docs]</a> <span class="k">def</span> <span class="nf">pcoll_leaf_consumers_iter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_id</span><span class="p">):</span> |
| <span class="n">transform</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span> |
| <span class="n">transform_inputs</span> <span class="o">=</span> <span class="nb">set</span><span class="p">(</span><span class="n">transform</span><span class="o">.</span><span class="n">inputs</span><span class="o">.</span><span class="n">values</span><span class="p">())</span> |
| <span class="n">side_inputs</span> <span class="o">=</span> <span class="nb">set</span><span class="p">(</span><span class="n">translations</span><span class="o">.</span><span class="n">side_inputs</span><span class="p">(</span><span class="n">transform</span><span class="p">)</span><span class="o">.</span><span class="n">values</span><span class="p">())</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_leaf</span><span class="p">(</span><span class="n">transform_id</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">transform</span><span class="o">.</span><span class="n">inputs</span><span class="o">.</span><span class="n">values</span><span class="p">():</span> |
| <span class="k">yield</span> <span class="n">pcoll</span><span class="p">,</span> <span class="p">(</span><span class="n">transform_id</span><span class="p">,</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">side_inputs</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">subtransform</span> <span class="ow">in</span> <span class="n">transform</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">pcoll</span><span class="p">,</span> <span class="p">(</span><span class="n">consumer</span><span class="p">,</span> |
| <span class="n">annotation</span><span class="p">)</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pcoll_leaf_consumers_iter</span><span class="p">(</span><span class="n">subtransform</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_leaf</span><span class="p">(</span><span class="n">transform_id</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">pcoll</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">transform_inputs</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">pcoll</span><span class="p">,</span> <span class="p">(</span><span class="n">transform_id</span><span class="p">,</span> <span class="n">annotation</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">pcoll</span><span class="p">,</span> <span class="p">(</span><span class="n">consumer</span><span class="p">,</span> <span class="n">annotation</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.pcoll_leaf_consumers"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.pcoll_leaf_consumers">[docs]</a> <span class="k">def</span> <span class="nf">pcoll_leaf_consumers</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="n">collections</span><span class="o">.</span><span class="n">defaultdict</span><span class="p">(</span><span class="nb">list</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">roots</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">pcoll</span><span class="p">,</span> <span class="n">consumer_info</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pcoll_leaf_consumers_iter</span><span class="p">(</span><span class="n">transform_id</span><span class="p">):</span> |
| <span class="n">result</span><span class="p">[</span><span class="n">pcoll</span><span class="p">]</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">consumer_info</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">result</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.is_leaf"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.is_leaf">[docs]</a> <span class="k">def</span> <span class="nf">is_leaf</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_id</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="n">transform_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">leaf_composites</span> <span class="ow">or</span> |
| <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span><span class="o">.</span><span class="n">subtransforms</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.info"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.info">[docs]</a> <span class="k">def</span> <span class="nf">info</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">highlighted</span><span class="p">)</span> <span class="o">!=</span> <span class="mi">1</span><span class="p">:</span> |
| <span class="k">return</span> <span class="s1">''</span> |
| <span class="n">transform_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">highlighted</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="k">return</span> <span class="sa">f</span><span class="s1">'<pre></span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">[</span><span class="n">transform_id</span><span class="p">]</span><span class="si">}</span><span class="s1"></pre>'</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.layout_dot"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.layout_dot">[docs]</a> <span class="k">def</span> <span class="nf">layout_dot</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">layout</span> <span class="o">=</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">run</span><span class="p">([</span><span class="s1">'dot'</span><span class="p">,</span> <span class="s1">'-Tdot'</span><span class="p">],</span> |
| <span class="nb">input</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">to_dot</span><span class="p">()</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">),</span> |
| <span class="n">capture_output</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">check</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span><span class="o">.</span><span class="n">stdout</span> |
| |
| <span class="c1"># Try to capture the positions for layout consistency.</span> |
| <span class="n">json_out</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span> |
| <span class="n">subprocess</span><span class="o">.</span><span class="n">run</span><span class="p">([</span><span class="s1">'dot'</span><span class="p">,</span> <span class="s1">'-n2'</span><span class="p">,</span> <span class="s1">'-Kneato'</span><span class="p">,</span> <span class="s1">'-Tjson'</span><span class="p">],</span> |
| <span class="nb">input</span><span class="o">=</span><span class="n">layout</span><span class="p">,</span> |
| <span class="n">capture_output</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">check</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span><span class="o">.</span><span class="n">stdout</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">box</span> <span class="ow">in</span> <span class="n">json_out</span><span class="p">[</span><span class="s1">'objects'</span><span class="p">]:</span> |
| <span class="n">name</span> <span class="o">=</span> <span class="n">box</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'name'</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">components</span><span class="o">.</span><span class="n">transforms</span><span class="p">:</span> |
| <span class="k">if</span> <span class="s1">'pos'</span> <span class="ow">in</span> <span class="n">box</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">box</span><span class="p">[</span><span class="s1">'pos'</span><span class="p">]</span> |
| <span class="k">elif</span> <span class="s1">'bb'</span> <span class="ow">in</span> <span class="n">box</span><span class="p">:</span> |
| <span class="n">x0</span><span class="p">,</span> <span class="n">y0</span><span class="p">,</span> <span class="n">x1</span><span class="p">,</span> <span class="n">y1</span> <span class="o">=</span> <span class="p">[</span><span class="nb">float</span><span class="p">(</span><span class="n">r</span><span class="p">)</span> <span class="k">for</span> <span class="n">r</span> <span class="ow">in</span> <span class="n">box</span><span class="p">[</span><span class="s1">'bb'</span><span class="p">]</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">','</span><span class="p">)]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">latest_positions</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="sa">f</span><span class="s1">'</span><span class="si">{</span><span class="p">(</span><span class="n">x0</span><span class="o">+</span><span class="n">x1</span><span class="p">)</span><span class="o">/</span><span class="mi">2</span><span class="si">}</span><span class="s1">,</span><span class="si">{</span><span class="p">(</span><span class="n">y0</span><span class="o">+</span><span class="n">y1</span><span class="p">)</span><span class="o">/</span><span class="mi">2</span><span class="si">}</span><span class="s1">'</span> |
| |
| <span class="k">return</span> <span class="n">layout</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.page_callback_data"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.page_callback_data">[docs]</a> <span class="k">def</span> <span class="nf">page_callback_data</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">layout</span><span class="p">):</span> |
| <span class="n">svg</span> <span class="o">=</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">run</span><span class="p">([</span><span class="s1">'dot'</span><span class="p">,</span> <span class="s1">'-Kneato'</span><span class="p">,</span> <span class="s1">'-n2'</span><span class="p">,</span> <span class="s1">'-Tsvg'</span><span class="p">],</span> |
| <span class="nb">input</span><span class="o">=</span><span class="n">layout</span><span class="p">,</span> |
| <span class="n">capture_output</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">check</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span><span class="o">.</span><span class="n">stdout</span> |
| <span class="n">cmapx</span> <span class="o">=</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">run</span><span class="p">([</span><span class="s1">'dot'</span><span class="p">,</span> <span class="s1">'-Kneato'</span><span class="p">,</span> <span class="s1">'-n2'</span><span class="p">,</span> <span class="s1">'-Tcmapx'</span><span class="p">],</span> |
| <span class="nb">input</span><span class="o">=</span><span class="n">layout</span><span class="p">,</span> |
| <span class="n">capture_output</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">check</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span><span class="o">.</span><span class="n">stdout</span> |
| |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="s1">'src'</span><span class="p">:</span> <span class="s1">'data:image/svg+xml;base64,'</span> <span class="o">+</span> |
| <span class="n">base64</span><span class="o">.</span><span class="n">b64encode</span><span class="p">(</span><span class="n">svg</span><span class="p">)</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">),</span> |
| <span class="s1">'cmapx'</span><span class="p">:</span> <span class="n">cmapx</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">),</span> |
| <span class="s1">'info'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">info</span><span class="p">(),</span> |
| <span class="p">}</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.render_data"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.render_data">[docs]</a> <span class="k">def</span> <span class="nf">render_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Re-rendering pipeline..."</span><span class="p">)</span> |
| <span class="n">layout</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">layout_dot</span><span class="p">()</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">render_output</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">path</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">render_output</span><span class="p">:</span> |
| <span class="nb">format</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">splitext</span><span class="p">(</span><span class="n">path</span><span class="p">)[</span><span class="o">-</span><span class="mi">1</span><span class="p">][</span><span class="mi">1</span><span class="p">:]</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">run</span><span class="p">(</span> |
| <span class="p">[</span><span class="s1">'dot'</span><span class="p">,</span> <span class="s1">'-Kneato'</span><span class="p">,</span> <span class="s1">'-n2'</span><span class="p">,</span> <span class="s1">'-T'</span> <span class="o">+</span> <span class="nb">format</span><span class="p">,</span> <span class="s1">'-o'</span><span class="p">,</span> <span class="n">path</span><span class="p">],</span> |
| <span class="nb">input</span><span class="o">=</span><span class="n">layout</span><span class="p">,</span> |
| <span class="n">check</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">result</span><span class="o">.</span><span class="n">returncode</span><span class="p">:</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">error</span><span class="p">(</span> |
| <span class="s2">"Failed render pipeline as </span><span class="si">%r</span><span class="s2">: exit </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">result</span><span class="o">.</span><span class="n">returncode</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Rendered pipeline as </span><span class="si">%r</span><span class="s2">"</span><span class="p">,</span> <span class="n">path</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">page_callback_data</span><span class="p">(</span><span class="n">layout</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.render_json"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.render_json">[docs]</a> <span class="k">def</span> <span class="nf">render_json</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">render_data</span><span class="p">())</span></div> |
| |
| <div class="viewcode-block" id="PipelineRenderer.page"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.PipelineRenderer.page">[docs]</a> <span class="k">def</span> <span class="nf">page</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">data</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">render_data</span><span class="p">()</span> |
| <span class="n">src</span> <span class="o">=</span> <span class="n">data</span><span class="p">[</span><span class="s1">'src'</span><span class="p">]</span> |
| <span class="n">cmapx</span> <span class="o">=</span> <span class="n">data</span><span class="p">[</span><span class="s1">'cmapx'</span><span class="p">]</span> |
| <span class="k">return</span> <span class="s2">"""</span> |
| <span class="s2"> <html></span> |
| <span class="s2"> <head></span> |
| <span class="s2"> <script></span> |
| <span class="s2"> function click(transform_id) {</span> |
| <span class="s2"> var xhttp = new XMLHttpRequest();</span> |
| <span class="s2"> xhttp.onreadystatechange = function() {</span> |
| <span class="s2"> render_data = JSON.parse(this.responseText);</span> |
| <span class="s2"> document.getElementById('image_map_holder').innerHTML =</span> |
| <span class="s2"> render_data.cmapx;</span> |
| <span class="s2"> document.getElementById('image_tag').src = render_data.src</span> |
| <span class="s2"> document.getElementById('info').innerHTML = render_data.info</span> |
| <span class="s2"> };</span> |
| <span class="s2"> xhttp.open("GET", "render?toggle=" + transform_id, true);</span> |
| <span class="s2"> xhttp.send();</span> |
| <span class="s2"> }</span> |
| |
| <span class="s2"> </script></span> |
| <span class="s2"> </head></span> |
| <span class="s2"> """</span> <span class="o">+</span> <span class="sa">f</span><span class="s2">"""</span> |
| <span class="s2"> <body></span> |
| <span class="s2"> Click on a composite transform to expand.</span> |
| <span class="s2"> <br></span> |
| <span class="s2"> <img id='image_tag' src='</span><span class="si">{</span><span class="n">src</span><span class="si">}</span><span class="s2">' usemap='#G'></span> |
| <span class="s2"> <hr></span> |
| <span class="s2"> <div id='info'></div></span> |
| <span class="s2"> <div id='image_map_holder'></span> |
| <span class="s2"> </span><span class="si">{</span><span class="n">cmapx</span><span class="si">}</span> |
| <span class="s2"> </div></span> |
| <span class="s2"> </body></span> |
| <span class="s2"> </html></span> |
| <span class="s2"> """</span></div></div> |
| |
| |
| <div class="viewcode-block" id="RenderRunner"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.RenderRunner">[docs]</a><span class="k">class</span> <span class="nc">RenderRunner</span><span class="p">(</span><span class="n">runner</span><span class="o">.</span><span class="n">PipelineRunner</span><span class="p">):</span> |
| <span class="c1"># TODO(robertwb): Consider making this a runner wrapper, where live status</span> |
| <span class="c1"># (such as counters, stage completion status, or possibly even PCollection</span> |
| <span class="c1"># samples) queryable and/or displayed. This could evolve into a full Beam</span> |
| <span class="c1"># UI.</span> |
| <div class="viewcode-block" id="RenderRunner.run_pipeline"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.RenderRunner.run_pipeline">[docs]</a> <span class="k">def</span> <span class="nf">run_pipeline</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pipeline_object</span><span class="p">,</span> <span class="n">options</span><span class="p">,</span> <span class="n">pipeline_proto</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">pipeline_proto</span><span class="p">:</span> |
| <span class="n">pipeline_proto</span> <span class="o">=</span> <span class="n">pipeline_object</span><span class="o">.</span><span class="n">to_runner_api</span><span class="p">()</span> |
| <span class="n">render_options</span> <span class="o">=</span> <span class="n">options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">RenderOptions</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">render_options</span><span class="o">.</span><span class="n">log_proto</span><span class="p">:</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="n">pipeline_proto</span><span class="p">)</span> |
| <span class="n">renderer</span> <span class="o">=</span> <span class="n">PipelineRenderer</span><span class="p">(</span><span class="n">pipeline_proto</span><span class="p">,</span> <span class="n">render_options</span><span class="p">)</span> |
| <span class="n">renderer</span><span class="o">.</span><span class="n">page</span><span class="p">()</span> |
| |
| <span class="k">if</span> <span class="n">render_options</span><span class="o">.</span><span class="n">render_port</span> <span class="o">>=</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="c1"># TODO: If this gets more complex, we could consider taking on a</span> |
| <span class="c1"># framework like Flask as a dependency.</span> |
| <span class="k">class</span> <span class="nc">RequestHandler</span><span class="p">(</span><span class="n">http</span><span class="o">.</span><span class="n">server</span><span class="o">.</span><span class="n">BaseHTTPRequestHandler</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">do_GET</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">parts</span> <span class="o">=</span> <span class="n">urllib</span><span class="o">.</span><span class="n">parse</span><span class="o">.</span><span class="n">urlparse</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="p">)</span> |
| <span class="n">args</span> <span class="o">=</span> <span class="n">urllib</span><span class="o">.</span><span class="n">parse</span><span class="o">.</span><span class="n">parse_qs</span><span class="p">(</span><span class="n">parts</span><span class="o">.</span><span class="n">query</span><span class="p">)</span> |
| <span class="n">renderer</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="o">**</span><span class="n">args</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">parts</span><span class="o">.</span><span class="n">path</span> <span class="o">==</span> <span class="s1">'/'</span><span class="p">:</span> |
| <span class="n">response</span> <span class="o">=</span> <span class="n">renderer</span><span class="o">.</span><span class="n">page</span><span class="p">()</span> |
| <span class="k">elif</span> <span class="n">parts</span><span class="o">.</span><span class="n">path</span> <span class="o">==</span> <span class="s1">'/render'</span><span class="p">:</span> |
| <span class="n">response</span> <span class="o">=</span> <span class="n">renderer</span><span class="o">.</span><span class="n">render_json</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">send_response</span><span class="p">(</span><span class="mi">400</span><span class="p">)</span> |
| <span class="k">return</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">send_response</span><span class="p">(</span><span class="mi">200</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">send_header</span><span class="p">(</span><span class="s2">"Content-type"</span><span class="p">,</span> <span class="s2">"text/html"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">end_headers</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">wfile</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">response</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">))</span> |
| |
| <span class="n">server</span> <span class="o">=</span> <span class="n">http</span><span class="o">.</span><span class="n">server</span><span class="o">.</span><span class="n">HTTPServer</span><span class="p">((</span><span class="s1">'localhost'</span><span class="p">,</span> <span class="n">render_options</span><span class="o">.</span><span class="n">render_port</span><span class="p">),</span> |
| <span class="n">RequestHandler</span><span class="p">)</span> |
| <span class="n">server_thread</span> <span class="o">=</span> <span class="n">threading</span><span class="o">.</span><span class="n">Thread</span><span class="p">(</span><span class="n">target</span><span class="o">=</span><span class="n">server</span><span class="o">.</span><span class="n">serve_forever</span><span class="p">,</span> <span class="n">daemon</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> |
| <span class="n">server_thread</span><span class="o">.</span><span class="n">start</span><span class="p">()</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s1">'Serving at http://</span><span class="si">%s</span><span class="s1">:</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="n">server</span><span class="o">.</span><span class="n">server_address</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">RenderPipelineResult</span><span class="p">(</span><span class="n">server</span><span class="p">)</span> |
| |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">RenderPipelineResult</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span></div></div> |
| |
| |
| <div class="viewcode-block" id="RenderPipelineResult"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.RenderPipelineResult">[docs]</a><span class="k">class</span> <span class="nc">RenderPipelineResult</span><span class="p">(</span><span class="n">runner</span><span class="o">.</span><span class="n">PipelineResult</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">server</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">runner</span><span class="o">.</span><span class="n">PipelineState</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">server</span> <span class="o">=</span> <span class="n">server</span> |
| |
| <div class="viewcode-block" id="RenderPipelineResult.wait_until_finish"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.RenderPipelineResult.wait_until_finish">[docs]</a> <span class="k">def</span> <span class="nf">wait_until_finish</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">duration</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">server</span><span class="p">:</span> |
| <span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="n">duration</span> <span class="ow">or</span> <span class="mf">1e8</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">server</span><span class="o">.</span><span class="n">shutdown</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_state</span> <span class="o">=</span> <span class="n">runner</span><span class="o">.</span><span class="n">PipelineState</span><span class="o">.</span><span class="n">DONE</span></div> |
| |
| <div class="viewcode-block" id="RenderPipelineResult.monitoring_infos"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.RenderPipelineResult.monitoring_infos">[docs]</a> <span class="k">def</span> <span class="nf">monitoring_infos</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">[]</span></div></div> |
| |
| |
| <div class="viewcode-block" id="run"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.run">[docs]</a><span class="k">def</span> <span class="nf">run</span><span class="p">(</span><span class="n">argv</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">argv</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="o">==</span> <span class="vm">__file__</span><span class="p">:</span> |
| <span class="n">argv</span> <span class="o">=</span> <span class="n">argv</span><span class="p">[</span><span class="mi">1</span><span class="p">:]</span> |
| <span class="n">parser</span> <span class="o">=</span> <span class="n">argparse</span><span class="o">.</span><span class="n">ArgumentParser</span><span class="p">(</span> |
| <span class="n">description</span><span class="o">=</span><span class="vm">__doc__</span><span class="p">,</span> <span class="n">formatter_class</span><span class="o">=</span><span class="n">argparse</span><span class="o">.</span><span class="n">RawDescriptionHelpFormatter</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--job_port'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'port on which to serve the job api'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--pipeline_proto'</span><span class="p">,</span> <span class="n">help</span><span class="o">=</span><span class="s1">'file containing the beam pipeline definition'</span><span class="p">)</span> |
| <span class="n">RenderOptions</span><span class="o">.</span><span class="n">_add_argparse_args</span><span class="p">(</span><span class="n">parser</span><span class="p">)</span> |
| <span class="n">options</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">(</span><span class="n">argv</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">options</span><span class="o">.</span><span class="n">pipeline_proto</span><span class="p">:</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">options</span><span class="o">.</span><span class="n">render_output</span> <span class="ow">and</span> <span class="n">options</span><span class="o">.</span><span class="n">render_port</span> <span class="o"><</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="n">options</span><span class="o">.</span><span class="n">render_port</span> <span class="o">=</span> <span class="mi">0</span> |
| |
| <span class="n">render_one</span><span class="p">(</span><span class="n">options</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">options</span><span class="o">.</span><span class="n">render_output</span><span class="p">:</span> |
| <span class="k">return</span> |
| |
| <span class="n">run_server</span><span class="p">(</span><span class="n">options</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="render_one"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.render_one">[docs]</a><span class="k">def</span> <span class="nf">render_one</span><span class="p">(</span><span class="n">options</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">options</span><span class="o">.</span><span class="n">pipeline_proto</span> <span class="o">==</span> <span class="s1">'-'</span><span class="p">:</span> |
| <span class="n">content</span> <span class="o">=</span> <span class="n">sys</span><span class="o">.</span><span class="n">stdin</span><span class="o">.</span><span class="n">buffer</span><span class="o">.</span><span class="n">read</span><span class="p">()</span> |
| <span class="k">if</span> <span class="n">content</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="o">==</span> <span class="sa">b</span><span class="s1">'{'</span><span class="p">:</span> |
| <span class="n">ext</span> <span class="o">=</span> <span class="s1">'.json'</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">content</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">)</span> |
| <span class="n">ext</span> <span class="o">=</span> <span class="s1">'.textproto'</span> |
| <span class="k">except</span> <span class="ne">UnicodeDecodeError</span><span class="p">:</span> |
| <span class="n">ext</span> <span class="o">=</span> <span class="s1">'.pb'</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">options</span><span class="o">.</span><span class="n">pipeline_proto</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">'gs://'</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">gcsio</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ImportError</span><span class="p">(</span><span class="s1">'GCS not available; please install apache_beam[gcp]'</span><span class="p">)</span> |
| <span class="n">open_fn</span> <span class="o">=</span> <span class="n">gcsio</span><span class="o">.</span><span class="n">GcsIO</span><span class="p">()</span><span class="o">.</span><span class="n">open</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">open_fn</span> <span class="o">=</span> <span class="nb">open</span> |
| |
| <span class="k">with</span> <span class="n">open_fn</span><span class="p">(</span><span class="n">options</span><span class="o">.</span><span class="n">pipeline_proto</span><span class="p">,</span> <span class="s1">'rb'</span><span class="p">)</span> <span class="k">as</span> <span class="n">fin</span><span class="p">:</span> |
| <span class="n">content</span> <span class="o">=</span> <span class="n">fin</span><span class="o">.</span><span class="n">read</span><span class="p">()</span> |
| <span class="n">ext</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">splitext</span><span class="p">(</span><span class="n">options</span><span class="o">.</span><span class="n">pipeline_proto</span><span class="p">)[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span> |
| |
| <span class="k">if</span> <span class="n">ext</span> <span class="o">==</span> <span class="s1">'.textproto'</span><span class="p">:</span> |
| <span class="n">pipeline_proto</span> <span class="o">=</span> <span class="n">text_format</span><span class="o">.</span><span class="n">Parse</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">())</span> |
| <span class="k">elif</span> <span class="n">ext</span> <span class="o">==</span> <span class="s1">'.json'</span><span class="p">:</span> |
| <span class="n">pipeline_proto</span> <span class="o">=</span> <span class="n">json_format</span><span class="o">.</span><span class="n">Parse</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">())</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">pipeline_proto</span> <span class="o">=</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">()</span> |
| <span class="n">pipeline_proto</span><span class="o">.</span><span class="n">ParseFromString</span><span class="p">(</span><span class="n">content</span><span class="p">)</span> |
| |
| <span class="n">RenderRunner</span><span class="p">()</span><span class="o">.</span><span class="n">run_pipeline</span><span class="p">(</span> |
| <span class="kc">None</span><span class="p">,</span> <span class="n">pipeline_options</span><span class="o">.</span><span class="n">PipelineOptions</span><span class="p">(</span><span class="o">**</span><span class="nb">vars</span><span class="p">(</span><span class="n">options</span><span class="p">)),</span> <span class="n">pipeline_proto</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="run_server"><a class="viewcode-back" href="../../../apache_beam.runners.render.html#apache_beam.runners.render.run_server">[docs]</a><span class="k">def</span> <span class="nf">run_server</span><span class="p">(</span><span class="n">options</span><span class="p">):</span> |
| <span class="k">class</span> <span class="nc">RenderBeamJob</span><span class="p">(</span><span class="n">local_job_service</span><span class="o">.</span><span class="n">BeamJob</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">_invoke_runner</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">RenderRunner</span><span class="p">()</span><span class="o">.</span><span class="n">run_pipeline</span><span class="p">(</span> |
| <span class="kc">None</span><span class="p">,</span> |
| <span class="n">pipeline_options</span><span class="o">.</span><span class="n">PipelineOptions</span><span class="p">(</span><span class="o">**</span><span class="nb">vars</span><span class="p">(</span><span class="n">options</span><span class="p">)),</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_pipeline_proto</span><span class="p">)</span> |
| |
| <span class="k">with</span> <span class="n">tempfile</span><span class="o">.</span><span class="n">TemporaryDirectory</span><span class="p">()</span> <span class="k">as</span> <span class="n">staging_dir</span><span class="p">:</span> |
| <span class="n">job_servicer</span> <span class="o">=</span> <span class="n">local_job_service</span><span class="o">.</span><span class="n">LocalJobServicer</span><span class="p">(</span> |
| <span class="n">staging_dir</span><span class="p">,</span> <span class="n">beam_job_type</span><span class="o">=</span><span class="n">RenderBeamJob</span><span class="p">)</span> |
| <span class="n">port</span> <span class="o">=</span> <span class="n">job_servicer</span><span class="o">.</span><span class="n">start_grpc_server</span><span class="p">(</span><span class="n">options</span><span class="o">.</span><span class="n">job_port</span><span class="p">)</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">local_job_service_main</span><span class="o">.</span><span class="n">serve</span><span class="p">(</span> |
| <span class="s2">"Listening for beam jobs on port </span><span class="si">%d</span><span class="s2">."</span> <span class="o">%</span> <span class="n">port</span><span class="p">,</span> <span class="n">job_servicer</span><span class="p">)</span> |
| <span class="k">finally</span><span class="p">:</span> |
| <span class="n">job_servicer</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span></div> |
| |
| |
| <span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s1">'__main__'</span><span class="p">:</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">basicConfig</span><span class="p">()</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">()</span><span class="o">.</span><span class="n">setLevel</span><span class="p">(</span><span class="n">logging</span><span class="o">.</span><span class="n">INFO</span><span class="p">)</span> |
| <span class="n">run</span><span class="p">(</span><span class="n">sys</span><span class="o">.</span><span class="n">argv</span><span class="p">)</span> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| © Copyright |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |