blob: c0c767b028df8becb1d22bd9cd783a555af039d9 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.runners.interactive.user_pipeline_tracker &mdash; Apache Beam 2.38.0 documentation</title>
<script type="text/javascript" src="../../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../../" src="../../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../../genindex.html" />
<link rel="search" title="Search" href="../../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.38.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.utils.html">apache_beam.utils package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.runners.interactive.user_pipeline_tracker</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.runners.interactive.user_pipeline_tracker</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Class that tracks derived/pipeline fragments from user pipelines.</span>
<span class="sd">For internal use only; no backwards-compatibility guarantees.</span>
<span class="sd">In the InteractiveRunner the design is to keep the user pipeline unchanged,</span>
<span class="sd">create a copy of the user pipeline, and modify the copy. When the derived</span>
<span class="sd">pipeline runs, there should only be per-user pipeline state. This makes sure</span>
<span class="sd">that derived pipelines can link back to the parent user pipeline.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="kn">import</span> <span class="nn">shutil</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Iterator</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Optional</span>
<span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span> <span class="c1"># type: ignore</span>
<div class="viewcode-block" id="UserPipelineTracker"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.user_pipeline_tracker.html#apache_beam.runners.interactive.user_pipeline_tracker.UserPipelineTracker">[docs]</a><span class="k">class</span> <span class="nc">UserPipelineTracker</span><span class="p">:</span>
<span class="sd">&quot;&quot;&quot;Tracks user pipelines from derived pipelines.</span>
<span class="sd"> This data structure is similar to a disjoint set data structure. A derived</span>
<span class="sd"> pipeline can only have one parent user pipeline. A user pipeline can have many</span>
<span class="sd"> derived pipelines.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_user_pipelines</span><span class="p">:</span> <span class="nb">dict</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">,</span> <span class="nb">list</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">]]</span> <span class="o">=</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_derived_pipelines</span><span class="p">:</span> <span class="nb">dict</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">]</span> <span class="o">=</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pid_to_pipelines</span><span class="p">:</span> <span class="nb">dict</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">]</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">def</span> <span class="fm">__iter__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">]:</span>
<span class="sd">&quot;&quot;&quot;Iterates through all the user pipelines.&quot;&quot;&quot;</span>
<span class="k">for</span> <span class="n">p</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_user_pipelines</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">p</span>
<span class="k">def</span> <span class="nf">_key</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pipeline</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">str</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="nb">id</span><span class="p">(</span><span class="n">pipeline</span><span class="p">))</span>
<div class="viewcode-block" id="UserPipelineTracker.evict"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.user_pipeline_tracker.html#apache_beam.runners.interactive.user_pipeline_tracker.UserPipelineTracker.evict">[docs]</a> <span class="k">def</span> <span class="nf">evict</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pipeline</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="sd">&quot;&quot;&quot;Evicts the pipeline.</span>
<span class="sd"> Removes the given pipeline and derived pipelines if a user pipeline.</span>
<span class="sd"> Otherwise, removes the given derived pipeline.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">user_pipeline</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_user_pipeline</span><span class="p">(</span><span class="n">pipeline</span><span class="p">)</span>
<span class="k">if</span> <span class="n">user_pipeline</span><span class="p">:</span>
<span class="k">for</span> <span class="n">d</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_user_pipelines</span><span class="p">[</span><span class="n">user_pipeline</span><span class="p">]:</span>
<span class="k">del</span> <span class="bp">self</span><span class="o">.</span><span class="n">_derived_pipelines</span><span class="p">[</span><span class="n">d</span><span class="p">]</span>
<span class="k">del</span> <span class="bp">self</span><span class="o">.</span><span class="n">_user_pipelines</span><span class="p">[</span><span class="n">user_pipeline</span><span class="p">]</span>
<span class="k">elif</span> <span class="n">pipeline</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_derived_pipelines</span><span class="p">:</span>
<span class="k">del</span> <span class="bp">self</span><span class="o">.</span><span class="n">_derived_pipelines</span><span class="p">[</span><span class="n">pipeline</span><span class="p">]</span></div>
<div class="viewcode-block" id="UserPipelineTracker.clear"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.user_pipeline_tracker.html#apache_beam.runners.interactive.user_pipeline_tracker.UserPipelineTracker.clear">[docs]</a> <span class="k">def</span> <span class="nf">clear</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="sd">&quot;&quot;&quot;Clears the tracker of all user and derived pipelines.&quot;&quot;&quot;</span>
<span class="c1"># Remove all local_tempdir of created pipelines.</span>
<span class="k">for</span> <span class="n">p</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pid_to_pipelines</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="n">shutil</span><span class="o">.</span><span class="n">rmtree</span><span class="p">(</span><span class="n">p</span><span class="o">.</span><span class="n">local_tempdir</span><span class="p">,</span> <span class="n">ignore_errors</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_user_pipelines</span><span class="o">.</span><span class="n">clear</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_derived_pipelines</span><span class="o">.</span><span class="n">clear</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pid_to_pipelines</span><span class="o">.</span><span class="n">clear</span><span class="p">()</span></div>
<div class="viewcode-block" id="UserPipelineTracker.get_pipeline"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.user_pipeline_tracker.html#apache_beam.runners.interactive.user_pipeline_tracker.UserPipelineTracker.get_pipeline">[docs]</a> <span class="k">def</span> <span class="nf">get_pipeline</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pid</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">]:</span>
<span class="sd">&quot;&quot;&quot;Returns the pipeline corresponding to the given pipeline id.&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pid_to_pipelines</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">pid</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span></div>
<div class="viewcode-block" id="UserPipelineTracker.add_user_pipeline"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.user_pipeline_tracker.html#apache_beam.runners.interactive.user_pipeline_tracker.UserPipelineTracker.add_user_pipeline">[docs]</a> <span class="k">def</span> <span class="nf">add_user_pipeline</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">p</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">:</span>
<span class="sd">&quot;&quot;&quot;Adds a user pipeline with an empty set of derived pipelines.&quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_memoize_pipieline</span><span class="p">(</span><span class="n">p</span><span class="p">)</span>
<span class="c1"># Create a new node for the user pipeline if it doesn&#39;t exist already.</span>
<span class="n">user_pipeline</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_user_pipeline</span><span class="p">(</span><span class="n">p</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">user_pipeline</span><span class="p">:</span>
<span class="n">user_pipeline</span> <span class="o">=</span> <span class="n">p</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_user_pipelines</span><span class="p">[</span><span class="n">p</span><span class="p">]</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">return</span> <span class="n">user_pipeline</span></div>
<span class="k">def</span> <span class="nf">_memoize_pipieline</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">p</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="sd">&quot;&quot;&quot;Memoizes the pid of the pipeline to the pipeline object.&quot;&quot;&quot;</span>
<span class="n">pid</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_key</span><span class="p">(</span><span class="n">p</span><span class="p">)</span>
<span class="k">if</span> <span class="n">pid</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pid_to_pipelines</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pid_to_pipelines</span><span class="p">[</span><span class="n">pid</span><span class="p">]</span> <span class="o">=</span> <span class="n">p</span>
<div class="viewcode-block" id="UserPipelineTracker.add_derived_pipeline"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.user_pipeline_tracker.html#apache_beam.runners.interactive.user_pipeline_tracker.UserPipelineTracker.add_derived_pipeline">[docs]</a> <span class="k">def</span> <span class="nf">add_derived_pipeline</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">maybe_user_pipeline</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">,</span>
<span class="n">derived_pipeline</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="sd">&quot;&quot;&quot;Adds a derived pipeline with the user pipeline.</span>
<span class="sd"> If the `maybe_user_pipeline` is a user pipeline, then the derived pipeline</span>
<span class="sd"> will be added to its set. Otherwise, the derived pipeline will be added to</span>
<span class="sd"> the user pipeline of the `maybe_user_pipeline`.</span>
<span class="sd"> By doing the above one can do:</span>
<span class="sd"> p = beam.Pipeline()</span>
<span class="sd"> derived1 = beam.Pipeline()</span>
<span class="sd"> derived2 = beam.Pipeline()</span>
<span class="sd"> ut = UserPipelineTracker()</span>
<span class="sd"> ut.add_derived_pipeline(p, derived1)</span>
<span class="sd"> ut.add_derived_pipeline(derived1, derived2)</span>
<span class="sd"> # Returns p.</span>
<span class="sd"> ut.get_user_pipeline(derived2)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_memoize_pipieline</span><span class="p">(</span><span class="n">maybe_user_pipeline</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_memoize_pipieline</span><span class="p">(</span><span class="n">derived_pipeline</span><span class="p">)</span>
<span class="c1"># Cannot add a derived pipeline twice.</span>
<span class="k">assert</span> <span class="n">derived_pipeline</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_derived_pipelines</span>
<span class="c1"># Get the &quot;true&quot; user pipeline. This allows for the user to derive a</span>
<span class="c1"># pipeline from another derived pipeline, use both as arguments, and this</span>
<span class="c1"># method will still get the correct user pipeline.</span>
<span class="n">user</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">add_user_pipeline</span><span class="p">(</span><span class="n">maybe_user_pipeline</span><span class="p">)</span>
<span class="c1"># Map the derived pipeline to the user pipeline.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_derived_pipelines</span><span class="p">[</span><span class="n">derived_pipeline</span><span class="p">]</span> <span class="o">=</span> <span class="n">user</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_user_pipelines</span><span class="p">[</span><span class="n">user</span><span class="p">]</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">derived_pipeline</span><span class="p">)</span></div>
<div class="viewcode-block" id="UserPipelineTracker.get_user_pipeline"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.user_pipeline_tracker.html#apache_beam.runners.interactive.user_pipeline_tracker.UserPipelineTracker.get_user_pipeline">[docs]</a> <span class="k">def</span> <span class="nf">get_user_pipeline</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">p</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">]:</span>
<span class="sd">&quot;&quot;&quot;Returns the user pipeline of the given pipeline.</span>
<span class="sd"> If the given pipeline has no user pipeline, i.e. not added to this tracker,</span>
<span class="sd"> then this returns None. If the given pipeline is a user pipeline then this</span>
<span class="sd"> returns the same pipeline. If the given pipeline is a derived pipeline then</span>
<span class="sd"> this returns the user pipeline.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># If `p` is a user pipeline then return it.</span>
<span class="k">if</span> <span class="n">p</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_user_pipelines</span><span class="p">:</span>
<span class="k">return</span> <span class="n">p</span>
<span class="c1"># If `p` exists then return its user pipeline.</span>
<span class="k">if</span> <span class="n">p</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_derived_pipelines</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_derived_pipelines</span><span class="p">[</span><span class="n">p</span><span class="p">]</span>
<span class="c1"># Otherwise, `p` is not in this tracker.</span>
<span class="k">return</span> <span class="kc">None</span></div></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>