blob: fefe4840ade6b1f59168d3b6101d9990a965c640 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.runners.interactive.interactive_beam &mdash; Apache Beam 2.47.0 documentation</title>
<script type="text/javascript" src="../../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../../" src="../../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../../genindex.html" />
<link rel="search" title="Search" href="../../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.47.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.runners.interactive.interactive_beam</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.runners.interactive.interactive_beam</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Module of Interactive Beam features that can be used in notebook.</span>
<span class="sd">The purpose of the module is to reduce the learning curve of Interactive Beam</span>
<span class="sd">users, provide a single place for importing and add sugar syntax for all</span>
<span class="sd">Interactive Beam components. It gives users capability to interact with existing</span>
<span class="sd">environment/session/context for Interactive Beam and visualize PCollections as</span>
<span class="sd">bounded dataset. In the meantime, it hides the interactivity implementation</span>
<span class="sd">from users so that users can focus on developing Beam pipeline without worrying</span>
<span class="sd">about how hidden states in the interactive session are managed.</span>
<span class="sd">A convention to import this module:</span>
<span class="sd"> from apache_beam.runners.interactive import interactive_beam as ib</span>
<span class="sd">Note: If you want backward-compatibility, only invoke interfaces provided by</span>
<span class="sd">this module in your notebook or application code.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="c1"># pytype: skip-file</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">warnings</span>
<span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">timedelta</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Dict</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">List</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Optional</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Union</span>
<span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span>
<span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span>
<span class="kn">from</span> <span class="nn">apache_beam.dataframe.frame_base</span> <span class="kn">import</span> <span class="n">DeferredBase</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="kn">import</span> <span class="n">FlinkRunnerOptions</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.dataproc.dataproc_cluster_manager</span> <span class="kn">import</span> <span class="n">DataprocClusterManager</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.dataproc.types</span> <span class="kn">import</span> <span class="n">ClusterIdentifier</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.dataproc.types</span> <span class="kn">import</span> <span class="n">ClusterMetadata</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.display</span> <span class="kn">import</span> <span class="n">pipeline_graph</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.display.pcoll_visualization</span> <span class="kn">import</span> <span class="n">visualize</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.display.pcoll_visualization</span> <span class="kn">import</span> <span class="n">visualize_computed_pcoll</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.options</span> <span class="kn">import</span> <span class="n">interactive_options</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.utils</span> <span class="kn">import</span> <span class="n">deferred_df_to_pcollection</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.utils</span> <span class="kn">import</span> <span class="n">elements_to_df</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.utils</span> <span class="kn">import</span> <span class="n">find_pcoll_name</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.utils</span> <span class="kn">import</span> <span class="n">progress_indicated</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.runner</span> <span class="kn">import</span> <span class="n">PipelineState</span>
<span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>
<div class="viewcode-block" id="Options"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.Options">[docs]</a><span class="k">class</span> <span class="nc">Options</span><span class="p">(</span><span class="n">interactive_options</span><span class="o">.</span><span class="n">InteractiveOptions</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Options that guide how Interactive Beam works.&quot;&quot;&quot;</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">enable_recording_replay</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Whether replayable source data recorded should be replayed for multiple</span>
<span class="sd"> PCollection evaluations and pipeline runs as long as the data recorded is</span>
<span class="sd"> still valid.&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">capture_control</span><span class="o">.</span><span class="n">_enable_capture_replay</span>
<span class="nd">@enable_recording_replay</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">enable_recording_replay</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Sets whether source data recorded should be replayed. True - Enables</span>
<span class="sd"> recording of replayable source data so that following PCollection</span>
<span class="sd"> evaluations and pipeline runs always use the same data recorded;</span>
<span class="sd"> False - Disables recording of replayable source data so that following</span>
<span class="sd"> PCollection evaluation and pipeline runs always use new data from sources.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># This makes sure the log handler is configured correctly in case the</span>
<span class="c1"># options are configured in an early stage.</span>
<span class="n">_</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span>
<span class="k">if</span> <span class="n">value</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;Record replay is enabled. When a PCollection is evaluated or the &#39;</span>
<span class="s1">&#39;pipeline is executed, existing data recorded from previous &#39;</span>
<span class="s1">&#39;computations will be replayed for consistent results. If no &#39;</span>
<span class="s1">&#39;recorded data is available, new data from recordable sources will &#39;</span>
<span class="s1">&#39;be recorded.&#39;</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;Record replay is disabled. The next time a PCollection is &#39;</span>
<span class="s1">&#39;evaluated or the pipeline is executed, new data will always be &#39;</span>
<span class="s1">&#39;consumed from sources in the pipeline. You will not have &#39;</span>
<span class="s1">&#39;replayability until re-enabling this option.&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">capture_control</span><span class="o">.</span><span class="n">_enable_capture_replay</span> <span class="o">=</span> <span class="n">value</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">recordable_sources</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Interactive Beam automatically records data from sources in this set.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">capture_control</span><span class="o">.</span><span class="n">_capturable_sources</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">recording_duration</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;The data recording of sources ends as soon as the background source</span>
<span class="sd"> recording job has run for this long.&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">capture_control</span><span class="o">.</span><span class="n">_capture_duration</span>
<span class="nd">@recording_duration</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">recording_duration</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Sets the recording duration as a timedelta. The input can be a</span>
<span class="sd"> datetime.timedelta, a possitive integer as seconds or a string</span>
<span class="sd"> representation that is parsable by pandas.to_timedelta.</span>
<span class="sd"> Example::</span>
<span class="sd"> # Sets the recording duration limit to 10 seconds.</span>
<span class="sd"> ib.options.recording_duration = timedelta(seconds=10)</span>
<span class="sd"> ib.options.recording_duration = 10</span>
<span class="sd"> ib.options.recording_duration = &#39;10s&#39;</span>
<span class="sd"> # Explicitly control the recordings.</span>
<span class="sd"> ib.recordings.stop(p)</span>
<span class="sd"> ib.recordings.clear(p)</span>
<span class="sd"> ib.recordings.record(p)</span>
<span class="sd"> # The next PCollection evaluation uses fresh data from sources,</span>
<span class="sd"> # and the data recorded will be replayed until another clear.</span>
<span class="sd"> ib.collect(some_pcoll)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">duration</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span>
<span class="k">assert</span> <span class="n">value</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">,</span> <span class="s1">&#39;Duration must be a positive value.&#39;</span>
<span class="n">duration</span> <span class="o">=</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">value</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="n">duration</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">to_timedelta</span><span class="p">(</span><span class="n">value</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="n">timedelta</span><span class="p">),</span> <span class="p">(</span><span class="s1">&#39;The input can only abe a &#39;</span>
<span class="s1">&#39;datetime.timedelta, a possitive integer as seconds, or a string &#39;</span>
<span class="s1">&#39;representation that is parsable by pandas.to_timedelta.&#39;</span><span class="p">)</span>
<span class="n">duration</span> <span class="o">=</span> <span class="n">value</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">capture_control</span><span class="o">.</span><span class="n">_capture_duration</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">(</span>
<span class="p">)</span> <span class="o">!=</span> <span class="n">duration</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">():</span>
<span class="n">_</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;You have changed recording duration from </span><span class="si">%s</span><span class="s1"> seconds to </span><span class="si">%s</span><span class="s1"> seconds. &#39;</span>
<span class="s1">&#39;To allow new data to be recorded for the updated duration the &#39;</span>
<span class="s1">&#39;next time a PCollection is evaluated or the pipeline is executed, &#39;</span>
<span class="s1">&#39;please invoke ib.recordings.stop, ib.recordings.clear and &#39;</span>
<span class="s1">&#39;ib.recordings.record.&#39;</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">capture_control</span><span class="o">.</span><span class="n">_capture_duration</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">(),</span>
<span class="n">duration</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">())</span>
<span class="bp">self</span><span class="o">.</span><span class="n">capture_control</span><span class="o">.</span><span class="n">_capture_duration</span> <span class="o">=</span> <span class="n">duration</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">recording_size_limit</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;The data recording of sources ends as soon as the size (in bytes) of data</span>
<span class="sd"> recorded from recordable sources reaches the limit.&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">capture_control</span><span class="o">.</span><span class="n">_capture_size_limit</span>
<span class="nd">@recording_size_limit</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">recording_size_limit</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Sets the recording size in bytes.</span>
<span class="sd"> Example::</span>
<span class="sd"> # Sets the recording size limit to 1GB.</span>
<span class="sd"> interactive_beam.options.recording_size_limit = 1e9</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">capture_control</span><span class="o">.</span><span class="n">_capture_size_limit</span> <span class="o">!=</span> <span class="n">value</span><span class="p">:</span>
<span class="n">_</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;You have changed recording size limit from </span><span class="si">%s</span><span class="s1"> bytes to </span><span class="si">%s</span><span class="s1"> bytes. To &#39;</span>
<span class="s1">&#39;allow new data to be recorded under the updated size limit the &#39;</span>
<span class="s1">&#39;next time a PCollection is recorded or the pipeline is executed, &#39;</span>
<span class="s1">&#39;please invoke ib.recordings.stop, ib.recordings.clear and &#39;</span>
<span class="s1">&#39;ib.recordings.record.&#39;</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">capture_control</span><span class="o">.</span><span class="n">_capture_size_limit</span><span class="p">,</span>
<span class="n">value</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">capture_control</span><span class="o">.</span><span class="n">_capture_size_limit</span> <span class="o">=</span> <span class="n">value</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">display_timestamp_format</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;The format in which timestamps are displayed.</span>
<span class="sd"> Default is &#39;%Y-%m-%d %H:%M:%S.%f%z&#39;, e.g. 2020-02-01 15:05:06.000015-08:00.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_display_timestamp_format</span>
<span class="nd">@display_timestamp_format</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">display_timestamp_format</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Sets the format in which timestamps are displayed.</span>
<span class="sd"> Default is &#39;%Y-%m-%d %H:%M:%S.%f%z&#39;, e.g. 2020-02-01 15:05:06.000015-08:00.</span>
<span class="sd"> Example::</span>
<span class="sd"> # Sets the format to not display the timezone or microseconds.</span>
<span class="sd"> interactive_beam.options.display_timestamp_format = %Y-%m-%d %H:%M:%S&#39;</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_display_timestamp_format</span> <span class="o">=</span> <span class="n">value</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">display_timezone</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;The timezone in which timestamps are displayed.</span>
<span class="sd"> Defaults to local timezone.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_display_timezone</span>
<span class="nd">@display_timezone</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">display_timezone</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Sets the timezone (datetime.tzinfo) in which timestamps are displayed.</span>
<span class="sd"> Defaults to local timezone.</span>
<span class="sd"> Example::</span>
<span class="sd"> # Imports the timezone library.</span>
<span class="sd"> from pytz import timezone</span>
<span class="sd"> # Will display all timestamps in the US/Eastern time zone.</span>
<span class="sd"> tz = timezone(&#39;US/Eastern&#39;)</span>
<span class="sd"> # You can also use dateutil.tz to get a timezone.</span>
<span class="sd"> tz = dateutil.tz.gettz(&#39;US/Eastern&#39;)</span>
<span class="sd"> interactive_beam.options.display_timezone = tz</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_display_timezone</span> <span class="o">=</span> <span class="n">value</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">cache_root</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;The cache directory specified by the user.</span>
<span class="sd"> Defaults to None.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cache_root</span>
<span class="nd">@cache_root</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">cache_root</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Sets the cache directory.</span>
<span class="sd"> Defaults to None.</span>
<span class="sd"> Example of local directory usage::</span>
<span class="sd"> interactive_beam.options.cache_root = &#39;/Users/username/my/cache/dir&#39;</span>
<span class="sd"> Example of GCS directory usage::</span>
<span class="sd"> interactive_beam.options.cache_root = &#39;gs://my-gcs-bucket/cache/dir&#39;</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;Interactive Beam has detected a set value for the cache_root &#39;</span>
<span class="s1">&#39;option. Please note: existing cache managers will not have &#39;</span>
<span class="s1">&#39;their current cache directory changed. The option must be &#39;</span>
<span class="s1">&#39;set in Interactive Beam prior to the initialization of new &#39;</span>
<span class="s1">&#39;pipelines to take effect. To apply changes to new pipelines, &#39;</span>
<span class="s1">&#39;the kernel must be restarted or the pipeline creation codes &#39;</span>
<span class="s1">&#39;must be re-executed. &#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_cache_root</span> <span class="o">=</span> <span class="n">value</span></div>
<div class="viewcode-block" id="Recordings"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.Recordings">[docs]</a><span class="k">class</span> <span class="nc">Recordings</span><span class="p">():</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;An introspection interface for recordings for pipelines.</span>
<span class="sd"> When a user materializes a PCollection onto disk (eg. ib.show) for a streaming</span>
<span class="sd"> pipeline, a background source recording job is started. This job pulls data</span>
<span class="sd"> from all defined unbounded sources for that PCollection&#39;s pipeline. The</span>
<span class="sd"> following methods allow for introspection into that background recording job.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="Recordings.describe"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.Recordings.describe">[docs]</a> <span class="k">def</span> <span class="nf">describe</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pipeline</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="c1"># type: (Optional[beam.Pipeline]) -&gt; dict[str, Any] # noqa: F821</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns a description of all the recordings for the given pipeline.</span>
<span class="sd"> If no pipeline is given then this returns a dictionary of descriptions for</span>
<span class="sd"> all pipelines.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Create the RecordingManager if it doesn&#39;t already exist.</span>
<span class="k">if</span> <span class="n">pipeline</span><span class="p">:</span>
<span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">get_recording_manager</span><span class="p">(</span><span class="n">pipeline</span><span class="p">,</span> <span class="n">create_if_absent</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="n">description</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">describe_all_recordings</span><span class="p">()</span>
<span class="k">if</span> <span class="n">pipeline</span><span class="p">:</span>
<span class="k">return</span> <span class="n">description</span><span class="p">[</span><span class="n">pipeline</span><span class="p">]</span>
<span class="k">return</span> <span class="n">description</span></div>
<div class="viewcode-block" id="Recordings.clear"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.Recordings.clear">[docs]</a> <span class="k">def</span> <span class="nf">clear</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pipeline</span><span class="p">):</span>
<span class="c1"># type: (beam.Pipeline) -&gt; bool</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Clears all recordings of the given pipeline. Returns True if cleared.&quot;&quot;&quot;</span>
<span class="n">description</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">describe</span><span class="p">(</span><span class="n">pipeline</span><span class="p">)</span>
<span class="k">if</span> <span class="p">(</span><span class="ow">not</span> <span class="n">PipelineState</span><span class="o">.</span><span class="n">is_terminal</span><span class="p">(</span><span class="n">description</span><span class="p">[</span><span class="s1">&#39;state&#39;</span><span class="p">])</span> <span class="ow">and</span>
<span class="n">description</span><span class="p">[</span><span class="s1">&#39;state&#39;</span><span class="p">]</span> <span class="o">!=</span> <span class="n">PipelineState</span><span class="o">.</span><span class="n">STOPPED</span><span class="p">):</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;Trying to clear a recording with a running pipeline. Did &#39;</span>
<span class="s1">&#39;you forget to call ib.recordings.stop?&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">cleanup</span><span class="p">(</span><span class="n">pipeline</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">True</span></div>
<div class="viewcode-block" id="Recordings.stop"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.Recordings.stop">[docs]</a> <span class="k">def</span> <span class="nf">stop</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pipeline</span><span class="p">):</span>
<span class="c1"># type: (beam.Pipeline) -&gt; None</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Stops the background source recording of the given pipeline.&quot;&quot;&quot;</span>
<span class="n">recording_manager</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">get_recording_manager</span><span class="p">(</span>
<span class="n">pipeline</span><span class="p">,</span> <span class="n">create_if_absent</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="n">recording_manager</span><span class="o">.</span><span class="n">cancel</span><span class="p">()</span></div>
<div class="viewcode-block" id="Recordings.record"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.Recordings.record">[docs]</a> <span class="k">def</span> <span class="nf">record</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pipeline</span><span class="p">):</span>
<span class="c1"># type: (beam.Pipeline) -&gt; bool</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Starts a background source recording job for the given pipeline. Returns</span>
<span class="sd"> True if the recording job was started.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">description</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">describe</span><span class="p">(</span><span class="n">pipeline</span><span class="p">)</span>
<span class="k">if</span> <span class="p">(</span><span class="ow">not</span> <span class="n">PipelineState</span><span class="o">.</span><span class="n">is_terminal</span><span class="p">(</span><span class="n">description</span><span class="p">[</span><span class="s1">&#39;state&#39;</span><span class="p">])</span> <span class="ow">and</span>
<span class="n">description</span><span class="p">[</span><span class="s1">&#39;state&#39;</span><span class="p">]</span> <span class="o">!=</span> <span class="n">PipelineState</span><span class="o">.</span><span class="n">STOPPED</span><span class="p">):</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;Trying to start a recording with a running pipeline. Did &#39;</span>
<span class="s1">&#39;you forget to call ib.recordings.stop?&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">if</span> <span class="n">description</span><span class="p">[</span><span class="s1">&#39;size&#39;</span><span class="p">]</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;A recording already exists for this pipeline. To start a &#39;</span>
<span class="s1">&#39;recording, make sure to call ib.recordings.clear first.&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="n">recording_manager</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">get_recording_manager</span><span class="p">(</span>
<span class="n">pipeline</span><span class="p">,</span> <span class="n">create_if_absent</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="k">return</span> <span class="n">recording_manager</span><span class="o">.</span><span class="n">record_pipeline</span><span class="p">()</span></div></div>
<div class="viewcode-block" id="Clusters"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.Clusters">[docs]</a><span class="k">class</span> <span class="nc">Clusters</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;An interface to control clusters implicitly created and managed by</span>
<span class="sd"> the current interactive environment. This class is not needed and</span>
<span class="sd"> should not be used otherwise.</span>
<span class="sd"> Do not use it for clusters a user explicitly manages: e.g., if you have</span>
<span class="sd"> a Flink cluster running somewhere and provides the flink master when</span>
<span class="sd"> running a pipeline with the FlinkRunner, the cluster will not be tracked</span>
<span class="sd"> or managed by Beam.</span>
<span class="sd"> To reuse the same cluster for your pipelines, use the same pipeline</span>
<span class="sd"> options: e.g., a pipeline option with the same flink master if you are</span>
<span class="sd"> using FlinkRunner.</span>
<span class="sd"> This module is experimental. No backwards-compatibility guarantees.</span>
<span class="sd"> Interactive Beam automatically creates/reuses existing worker clusters to</span>
<span class="sd"> execute pipelines when it detects the need from configurations.</span>
<span class="sd"> Currently, the only supported cluster implementation is Flink running on</span>
<span class="sd"> Cloud Dataproc.</span>
<span class="sd"> To configure a pipeline to run on Cloud Dataproc with Flink, set the</span>
<span class="sd"> underlying runner of the InteractiveRunner to FlinkRunner and the pipeline</span>
<span class="sd"> options to indicate where on Cloud the FlinkRunner should be deployed to.</span>
<span class="sd"> An example to enable automatic Dataproc cluster creation/reuse::</span>
<span class="sd"> options = PipelineOptions([</span>
<span class="sd"> &#39;--project=my-project&#39;,</span>
<span class="sd"> &#39;--region=my-region&#39;,</span>
<span class="sd"> &#39;--environment_type=DOCKER&#39;])</span>
<span class="sd"> pipeline = beam.Pipeline(InteractiveRunner(</span>
<span class="sd"> underlying_runner=FlinkRunner()), options=options)</span>
<span class="sd"> Reuse a pipeline options in another pipeline would configure Interactive Beam</span>
<span class="sd"> to reuse the same Dataproc cluster implicitly managed by the current</span>
<span class="sd"> interactive environment.</span>
<span class="sd"> If a flink_master is identified as a known cluster, the corresponding cluster</span>
<span class="sd"> is also resued.</span>
<span class="sd"> Furthermore, if a cluster is explicitly created by using a pipeline as an</span>
<span class="sd"> identifier to a known cluster, the cluster is reused.</span>
<span class="sd"> An example::</span>
<span class="sd"> # If pipeline runs on a known cluster, below code reuses the cluster</span>
<span class="sd"> # manager without creating a new one.</span>
<span class="sd"> dcm = ib.clusters.create(pipeline)</span>
<span class="sd"> To provision the cluster, use WorkerOptions. Supported configurations are::</span>
<span class="sd"> 1. subnetwork</span>
<span class="sd"> 2. num_workers</span>
<span class="sd"> 3. machine_type</span>
<span class="sd"> To configure a pipeline to run on an existing FlinkRunner deployed elsewhere,</span>
<span class="sd"> set the flink_master explicitly so no cluster will be created/reused.</span>
<span class="sd"> An example pipeline options to skip automatic Dataproc cluster usage::</span>
<span class="sd"> options = PipelineOptions([</span>
<span class="sd"> &#39;--flink_master=some.self.hosted.flink:port&#39;,</span>
<span class="sd"> &#39;--environment_type=DOCKER&#39;])</span>
<span class="sd"> To configure a pipeline to run on a local FlinkRunner, explicitly set the</span>
<span class="sd"> default cluster metadata to None: ib.clusters.set_default_cluster(None).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Explicitly set the Flink version here to ensure compatibility with 2.0</span>
<span class="c1"># Dataproc images:</span>
<span class="c1"># https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0</span>
<span class="n">DATAPROC_FLINK_VERSION</span> <span class="o">=</span> <span class="s1">&#39;1.12&#39;</span>
<span class="c1"># The minimum worker number to create a Dataproc cluster.</span>
<span class="n">DATAPROC_MINIMUM_WORKER_NUM</span> <span class="o">=</span> <span class="mi">2</span>
<span class="c1"># TODO(https://github.com/apache/beam/issues/21527): Fix the Dataproc image</span>
<span class="c1"># version after a released image contains all missing dependencies for Flink</span>
<span class="c1"># to run.</span>
<span class="c1"># DATAPROC_IMAGE_VERSION = &#39;2.0.XX-debian10&#39;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dataproc_cluster_managers</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="n">ClusterMetadata</span><span class="p">,</span>
<span class="n">DataprocClusterManager</span><span class="p">]</span> <span class="o">=</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">master_urls</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">ClusterMetadata</span><span class="p">]</span> <span class="o">=</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pipelines</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">,</span> <span class="n">DataprocClusterManager</span><span class="p">]</span> <span class="o">=</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">default_cluster_metadata</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">ClusterMetadata</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<div class="viewcode-block" id="Clusters.create"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.Clusters.create">[docs]</a> <span class="k">def</span> <span class="nf">create</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">cluster_identifier</span><span class="p">:</span> <span class="n">ClusterIdentifier</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">DataprocClusterManager</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Creates a Dataproc cluster manager provisioned for the cluster</span>
<span class="sd"> identified. If the cluster is known, returns an existing cluster manager.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Try to get some not-None cluster metadata.</span>
<span class="n">cluster_metadata</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">cluster_metadata</span><span class="p">(</span><span class="n">cluster_identifier</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">cluster_metadata</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;Unknown cluster identifier: </span><span class="si">%s</span><span class="s1">. Cannot create or reuse&#39;</span>
<span class="s1">&#39;a Dataproc cluster.&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">cluster_metadata</span><span class="o">.</span><span class="n">region</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;No region information was detected, defaulting Dataproc cluster &#39;</span>
<span class="s1">&#39;region to: us-central1.&#39;</span><span class="p">)</span>
<span class="n">cluster_metadata</span><span class="o">.</span><span class="n">region</span> <span class="o">=</span> <span class="s1">&#39;us-central1&#39;</span>
<span class="k">elif</span> <span class="n">cluster_metadata</span><span class="o">.</span><span class="n">region</span> <span class="o">==</span> <span class="s1">&#39;global&#39;</span><span class="p">:</span>
<span class="c1"># The global region is unsupported as it will be eventually deprecated.</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Clusters in the global region are not supported.&#39;</span><span class="p">)</span>
<span class="c1"># else use the provided region.</span>
<span class="k">if</span> <span class="p">(</span><span class="n">cluster_metadata</span><span class="o">.</span><span class="n">num_workers</span> <span class="ow">and</span>
<span class="n">cluster_metadata</span><span class="o">.</span><span class="n">num_workers</span> <span class="o">&lt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">DATAPROC_MINIMUM_WORKER_NUM</span><span class="p">):</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;At least </span><span class="si">%s</span><span class="s1"> workers are required for a cluster, defaulting to </span><span class="si">%s</span><span class="s1">.&#39;</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">DATAPROC_MINIMUM_WORKER_NUM</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">DATAPROC_MINIMUM_WORKER_NUM</span><span class="p">)</span>
<span class="n">cluster_metadata</span><span class="o">.</span><span class="n">num_workers</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">DATAPROC_MINIMUM_WORKER_NUM</span>
<span class="n">known_dcm</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">dataproc_cluster_managers</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">cluster_metadata</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">if</span> <span class="n">known_dcm</span><span class="p">:</span>
<span class="k">return</span> <span class="n">known_dcm</span>
<span class="n">dcm</span> <span class="o">=</span> <span class="n">DataprocClusterManager</span><span class="p">(</span><span class="n">cluster_metadata</span><span class="p">)</span>
<span class="n">dcm</span><span class="o">.</span><span class="n">create_flink_cluster</span><span class="p">()</span>
<span class="c1"># ClusterMetadata with derivative fields populated by the dcm.</span>
<span class="n">derived_meta</span> <span class="o">=</span> <span class="n">dcm</span><span class="o">.</span><span class="n">cluster_metadata</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dataproc_cluster_managers</span><span class="p">[</span><span class="n">derived_meta</span><span class="p">]</span> <span class="o">=</span> <span class="n">dcm</span>
<span class="bp">self</span><span class="o">.</span><span class="n">master_urls</span><span class="p">[</span><span class="n">derived_meta</span><span class="o">.</span><span class="n">master_url</span><span class="p">]</span> <span class="o">=</span> <span class="n">derived_meta</span>
<span class="c1"># Update the default cluster metadata to the one just created.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">set_default_cluster</span><span class="p">(</span><span class="n">derived_meta</span><span class="p">)</span>
<span class="k">return</span> <span class="n">dcm</span></div>
<div class="viewcode-block" id="Clusters.cleanup"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.Clusters.cleanup">[docs]</a> <span class="k">def</span> <span class="nf">cleanup</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">cluster_identifier</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">ClusterIdentifier</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">force</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Cleans up the cluster associated with the given cluster_identifier.</span>
<span class="sd"> When None cluster_identifier is provided: if force is True, cleans up for</span>
<span class="sd"> all clusters; otherwise, do a dry run and NOOP.</span>
<span class="sd"> If a beam.Pipeline is given as the ClusterIdentifier while multiple</span>
<span class="sd"> pipelines share the same cluster, it only cleans up the association between</span>
<span class="sd"> the pipeline and the cluster identified.</span>
<span class="sd"> If the cluster_identifier is unknown, NOOP.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">cluster_identifier</span><span class="p">:</span>
<span class="n">dcm_to_cleanup</span> <span class="o">=</span> <span class="nb">set</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dataproc_cluster_managers</span><span class="o">.</span><span class="n">values</span><span class="p">())</span>
<span class="k">if</span> <span class="n">force</span><span class="p">:</span>
<span class="k">for</span> <span class="n">dcm</span> <span class="ow">in</span> <span class="n">dcm_to_cleanup</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_cleanup</span><span class="p">(</span><span class="n">dcm</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">default_cluster_metadata</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;No cluster_identifier provided. If you intend to &#39;</span>
<span class="s1">&#39;clean up all clusters, invoke ib.clusters.cleanup(force=True). &#39;</span>
<span class="s1">&#39;Current clusters are </span><span class="si">%s</span><span class="s1">.&#39;</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">describe</span><span class="p">())</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">cluster_identifier</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">):</span>
<span class="n">p</span> <span class="o">=</span> <span class="n">cluster_identifier</span>
<span class="n">dcm</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipelines</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="n">p</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">if</span> <span class="n">dcm</span><span class="p">:</span>
<span class="n">dcm</span><span class="o">.</span><span class="n">pipelines</span><span class="o">.</span><span class="n">remove</span><span class="p">(</span><span class="n">p</span><span class="p">)</span>
<span class="n">warnings</span><span class="o">.</span><span class="n">filterwarnings</span><span class="p">(</span>
<span class="s1">&#39;ignore&#39;</span><span class="p">,</span>
<span class="s1">&#39;options is deprecated since First stable release. References to &#39;</span>
<span class="s1">&#39;&lt;pipeline&gt;.options will not be supported&#39;</span><span class="p">,</span>
<span class="n">category</span><span class="o">=</span><span class="ne">DeprecationWarning</span><span class="p">)</span>
<span class="n">p_flink_options</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">FlinkRunnerOptions</span><span class="p">)</span>
<span class="n">p_flink_options</span><span class="o">.</span><span class="n">flink_master</span> <span class="o">=</span> <span class="s1">&#39;[auto]&#39;</span>
<span class="n">p_flink_options</span><span class="o">.</span><span class="n">flink_version</span> <span class="o">=</span> <span class="kc">None</span>
<span class="c1"># Only cleans up when there is no pipeline using the cluster.</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">dcm</span><span class="o">.</span><span class="n">pipelines</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_cleanup</span><span class="p">(</span><span class="n">dcm</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">cluster_identifier</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="n">meta</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">master_urls</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">cluster_identifier</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">meta</span> <span class="o">=</span> <span class="n">cluster_identifier</span>
<span class="n">dcm</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">dataproc_cluster_managers</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">meta</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">if</span> <span class="n">dcm</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_cleanup</span><span class="p">(</span><span class="n">dcm</span><span class="p">)</span></div>
<div class="viewcode-block" id="Clusters.describe"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.Clusters.describe">[docs]</a> <span class="k">def</span> <span class="nf">describe</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">cluster_identifier</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">ClusterIdentifier</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Union</span><span class="p">[</span><span class="n">ClusterMetadata</span><span class="p">,</span> <span class="n">List</span><span class="p">[</span><span class="n">ClusterMetadata</span><span class="p">]]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Describes the ClusterMetadata by a ClusterIdentifier.</span>
<span class="sd"> If no cluster_identifier is given or if the cluster_identifier is unknown,</span>
<span class="sd"> it returns descriptions for all known clusters.</span>
<span class="sd"> Example usage:</span>
<span class="sd"> # Describe the cluster executing work for a pipeline.</span>
<span class="sd"> ib.clusters.describe(pipeline)</span>
<span class="sd"> # Describe the cluster with the flink master url.</span>
<span class="sd"> ib.clusters.describe(master_url)</span>
<span class="sd"> # Describe all existing clusters.</span>
<span class="sd"> ib.clusters.describe()</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">cluster_identifier</span><span class="p">:</span>
<span class="n">meta</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cluster_metadata</span><span class="p">(</span><span class="n">cluster_identifier</span><span class="p">)</span>
<span class="k">if</span> <span class="n">meta</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">dataproc_cluster_managers</span><span class="p">:</span>
<span class="k">return</span> <span class="n">meta</span>
<span class="k">return</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dataproc_cluster_managers</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span></div>
<div class="viewcode-block" id="Clusters.set_default_cluster"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.Clusters.set_default_cluster">[docs]</a> <span class="k">def</span> <span class="nf">set_default_cluster</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">cluster_identifier</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">ClusterIdentifier</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Temporarily sets the default metadata for creating or reusing a</span>
<span class="sd"> DataprocClusterManager. It is always updated to the most recently created</span>
<span class="sd"> cluster.</span>
<span class="sd"> If no known ClusterMetadata can be identified by the ClusterIdentifer, NOOP.</span>
<span class="sd"> If None is set, next time when Flink is in use, if no cluster is explicitly</span>
<span class="sd"> configured by a pipeline, the job runs locally.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">cluster_identifier</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">default_cluster_metadata</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">cluster_metadata</span><span class="p">(</span><span class="n">cluster_identifier</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">default_cluster_metadata</span> <span class="o">=</span> <span class="kc">None</span></div>
<div class="viewcode-block" id="Clusters.cluster_metadata"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.Clusters.cluster_metadata">[docs]</a> <span class="k">def</span> <span class="nf">cluster_metadata</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">cluster_identifier</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">ClusterIdentifier</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="n">ClusterMetadata</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Fetches the ClusterMetadata by a ClusterIdentifier that could be a</span>
<span class="sd"> URL in string, a Beam pipeline, or an equivalent to a known ClusterMetadata;</span>
<span class="sd"> If the given cluster_identifier is an URL or a pipeline that is unknown to</span>
<span class="sd"> the current environment, the default cluster metadata (could be None) is</span>
<span class="sd"> returned.</span>
<span class="sd"> If the given cluster_identifier is a ClusterMetadata but unknown to the</span>
<span class="sd"> current environment, passes it through (NOOP).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">meta</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cluster_metadata</span><span class="p">(</span><span class="n">cluster_identifier</span><span class="p">)</span>
<span class="k">return</span> <span class="n">meta</span> <span class="k">if</span> <span class="n">meta</span> <span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_cluster_metadata</span></div>
<span class="k">def</span> <span class="nf">_cluster_metadata</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">cluster_identifier</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">ClusterIdentifier</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="n">ClusterMetadata</span><span class="p">]:</span>
<span class="n">meta</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">cluster_identifier</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">cluster_identifier</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="n">meta</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">master_urls</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">cluster_identifier</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">cluster_identifier</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">):</span>
<span class="n">dcm</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pipelines</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">cluster_identifier</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">if</span> <span class="n">dcm</span><span class="p">:</span>
<span class="n">meta</span> <span class="o">=</span> <span class="n">dcm</span><span class="o">.</span><span class="n">cluster_metadata</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">cluster_identifier</span><span class="p">,</span> <span class="n">ClusterMetadata</span><span class="p">):</span>
<span class="n">meta</span> <span class="o">=</span> <span class="n">cluster_identifier</span>
<span class="k">if</span> <span class="n">meta</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">dataproc_cluster_managers</span><span class="p">:</span>
<span class="n">meta</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">dataproc_cluster_managers</span><span class="p">[</span><span class="n">meta</span><span class="p">]</span><span class="o">.</span><span class="n">cluster_metadata</span>
<span class="k">elif</span> <span class="p">(</span><span class="n">meta</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_cluster_metadata</span> <span class="ow">and</span>
<span class="n">meta</span><span class="o">.</span><span class="n">cluster_name</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_cluster_metadata</span><span class="o">.</span><span class="n">cluster_name</span><span class="p">):</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;Cannot change the configuration of the running cluster </span><span class="si">%s</span><span class="s1">. &#39;</span>
<span class="s1">&#39;Existing is </span><span class="si">%s</span><span class="s1">, desired is </span><span class="si">%s</span><span class="s1">.&#39;</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">default_cluster_metadata</span><span class="o">.</span><span class="n">cluster_name</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">default_cluster_metadata</span><span class="p">,</span>
<span class="n">meta</span><span class="p">)</span>
<span class="n">meta</span><span class="o">.</span><span class="n">reset_name</span><span class="p">()</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;To avoid conflict, issuing a new cluster name </span><span class="si">%s</span><span class="s1"> &#39;</span>
<span class="s1">&#39;for a new cluster.&#39;</span><span class="p">,</span>
<span class="n">meta</span><span class="o">.</span><span class="n">cluster_name</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span>
<span class="s1">&#39;A cluster_identifier should be Optional[Union[str, &#39;</span>
<span class="s1">&#39;beam.Pipeline, ClusterMetadata], instead </span><span class="si">%s</span><span class="s1"> was given.&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="p">(</span><span class="n">cluster_identifier</span><span class="p">))</span>
<span class="k">return</span> <span class="n">meta</span>
<span class="k">def</span> <span class="nf">_cleanup</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dcm</span><span class="p">:</span> <span class="n">DataprocClusterManager</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">dcm</span><span class="o">.</span><span class="n">cleanup</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dataproc_cluster_managers</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="n">dcm</span><span class="o">.</span><span class="n">cluster_metadata</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">master_urls</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="n">dcm</span><span class="o">.</span><span class="n">cluster_metadata</span><span class="o">.</span><span class="n">master_url</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">for</span> <span class="n">p</span> <span class="ow">in</span> <span class="n">dcm</span><span class="o">.</span><span class="n">pipelines</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pipelines</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="n">p</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">if</span> <span class="n">dcm</span><span class="o">.</span><span class="n">cluster_metadata</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_cluster_metadata</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">default_cluster_metadata</span> <span class="o">=</span> <span class="kc">None</span></div>
<span class="c1"># Users can set options to guide how Interactive Beam works.</span>
<span class="c1"># Examples:</span>
<span class="c1"># ib.options.enable_recording_replay = False/True</span>
<span class="c1"># ib.options.recording_duration = &#39;1m&#39;</span>
<span class="c1"># ib.options.recordable_sources.add(SourceClass)</span>
<span class="c1"># Check the docstrings for detailed usages.</span>
<span class="n">options</span> <span class="o">=</span> <span class="n">Options</span><span class="p">()</span>
<span class="c1"># Users can introspect into recordings by using the recordings class.</span>
<span class="c1"># Examples:</span>
<span class="c1"># p = beam.Pipeline(InteractiveRunner())</span>
<span class="c1"># elems = p | beam.Create([1, 2, 3])</span>
<span class="c1"># ib.show(elems)</span>
<span class="c1"># ib.recordings.describe(p)</span>
<span class="n">recordings</span> <span class="o">=</span> <span class="n">Recordings</span><span class="p">()</span>
<span class="c1"># Users can interact with the clusters used by their environment.</span>
<span class="c1"># Examples:</span>
<span class="c1"># ib.clusters.describe(p)</span>
<span class="c1"># Check the docstrings for detailed usages.</span>
<span class="n">clusters</span> <span class="o">=</span> <span class="n">Clusters</span><span class="p">()</span>
<div class="viewcode-block" id="watch"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.watch">[docs]</a><span class="k">def</span> <span class="nf">watch</span><span class="p">(</span><span class="n">watchable</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Monitors a watchable.</span>
<span class="sd"> This allows Interactive Beam to implicitly pass on the information about the</span>
<span class="sd"> location of your pipeline definition.</span>
<span class="sd"> Current implementation mainly watches for PCollection variables defined in</span>
<span class="sd"> user code. A watchable can be a dictionary of variable metadata such as</span>
<span class="sd"> locals(), a str name of a module, a module object or an instance of a class.</span>
<span class="sd"> The variable can come from any scope even local variables in a method of a</span>
<span class="sd"> class defined in a module.</span>
<span class="sd"> Below are all valid::</span>
<span class="sd"> watch(__main__) # if import __main__ is already invoked</span>
<span class="sd"> watch(&#39;__main__&#39;) # does not require invoking import __main__ beforehand</span>
<span class="sd"> watch(self) # inside a class</span>
<span class="sd"> watch(SomeInstance()) # an instance of a class</span>
<span class="sd"> watch(locals()) # inside a function, watching local variables within</span>
<span class="sd"> If you write a Beam pipeline in the __main__ module directly, since the</span>
<span class="sd"> __main__ module is always watched, you don&#39;t have to instruct Interactive</span>
<span class="sd"> Beam. If your Beam pipeline is defined in some module other than __main__,</span>
<span class="sd"> such as inside a class function or a unit test, you can watch() the scope.</span>
<span class="sd"> For example::</span>
<span class="sd"> class Foo(object)</span>
<span class="sd"> def run_pipeline(self):</span>
<span class="sd"> with beam.Pipeline() as p:</span>
<span class="sd"> init_pcoll = p | &#39;Init Create&#39; &gt;&gt; beam.Create(range(10))</span>
<span class="sd"> watch(locals())</span>
<span class="sd"> return init_pcoll</span>
<span class="sd"> init_pcoll = Foo().run_pipeline()</span>
<span class="sd"> Interactive Beam caches init_pcoll for the first run.</span>
<span class="sd"> Then you can use::</span>
<span class="sd"> show(init_pcoll)</span>
<span class="sd"> To visualize data from init_pcoll once the pipeline is executed.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">watch</span><span class="p">(</span><span class="n">watchable</span><span class="p">)</span></div>
<div class="viewcode-block" id="show"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.show">[docs]</a><span class="nd">@progress_indicated</span>
<span class="k">def</span> <span class="nf">show</span><span class="p">(</span>
<span class="o">*</span><span class="n">pcolls</span><span class="p">,</span>
<span class="n">include_window_info</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">visualize_data</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">n</span><span class="o">=</span><span class="s1">&#39;inf&#39;</span><span class="p">,</span>
<span class="n">duration</span><span class="o">=</span><span class="s1">&#39;inf&#39;</span><span class="p">):</span>
<span class="c1"># type: (*Union[Dict[Any, PCollection], Iterable[PCollection], PCollection], bool, bool, Union[int, str], Union[int, str]) -&gt; None # noqa: F821</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Shows given PCollections in an interactive exploratory way if used within</span>
<span class="sd"> a notebook, or prints a heading sampled data if used within an ipython shell.</span>
<span class="sd"> Noop if used in a non-interactive environment.</span>
<span class="sd"> Args:</span>
<span class="sd"> include_window_info: (optional) if True, windowing information of the</span>
<span class="sd"> data will be visualized too. Default is false.</span>
<span class="sd"> visualize_data: (optional) by default, the visualization contains data</span>
<span class="sd"> tables rendering data from given pcolls separately as if they are</span>
<span class="sd"> converted into dataframes. If visualize_data is True, there will be a</span>
<span class="sd"> more dive-in widget and statistically overview widget of the data.</span>
<span class="sd"> Otherwise, those 2 data visualization widgets will not be displayed.</span>
<span class="sd"> n: (optional) max number of elements to visualize. Default &#39;inf&#39;.</span>
<span class="sd"> duration: (optional) max duration of elements to read in integer seconds or</span>
<span class="sd"> a string duration. Default &#39;inf&#39;.</span>
<span class="sd"> The given pcolls can be dictionary of PCollections (as values), or iterable</span>
<span class="sd"> of PCollections or plain PCollection values.</span>
<span class="sd"> The user can specify either the max number of elements with `n` to read</span>
<span class="sd"> or the maximum duration of elements to read with `duration`. When a limiter is</span>
<span class="sd"> not supplied, it is assumed to be infinite.</span>
<span class="sd"> By default, the visualization contains data tables rendering data from given</span>
<span class="sd"> pcolls separately as if they are converted into dataframes. If visualize_data</span>
<span class="sd"> is True, there will be a more dive-in widget and statistically overview widget</span>
<span class="sd"> of the data. Otherwise, those 2 data visualization widgets will not be</span>
<span class="sd"> displayed.</span>
<span class="sd"> Ad hoc builds a pipeline fragment including only transforms that are</span>
<span class="sd"> necessary to produce data for given PCollections pcolls, runs the pipeline</span>
<span class="sd"> fragment to compute data for those pcolls and then visualizes the data.</span>
<span class="sd"> The function is always blocking. If used within a notebook, the data</span>
<span class="sd"> visualized might be dynamically updated before the function returns as more</span>
<span class="sd"> and more data could getting processed and emitted when the pipeline fragment</span>
<span class="sd"> is being executed. If used within an ipython shell, there will be no dynamic</span>
<span class="sd"> plotting but a static plotting in the end of pipeline fragment execution.</span>
<span class="sd"> The PCollections given must belong to the same pipeline.</span>
<span class="sd"> For example::</span>
<span class="sd"> p = beam.Pipeline(InteractiveRunner())</span>
<span class="sd"> init = p | &#39;Init&#39; &gt;&gt; beam.Create(range(1000))</span>
<span class="sd"> square = init | &#39;Square&#39; &gt;&gt; beam.Map(lambda x: x * x)</span>
<span class="sd"> cube = init | &#39;Cube&#39; &gt;&gt; beam.Map(lambda x: x ** 3)</span>
<span class="sd"> # Below builds a pipeline fragment from the defined pipeline `p` that</span>
<span class="sd"> # contains only applied transforms of `Init` and `Square`. Then the</span>
<span class="sd"> # interactive runner runs the pipeline fragment implicitly to compute data</span>
<span class="sd"> # represented by PCollection `square` and visualizes it.</span>
<span class="sd"> show(square)</span>
<span class="sd"> # This is equivalent to `show(square)` because `square` depends on `init`</span>
<span class="sd"> # and `init` is included in the pipeline fragment and computed anyway.</span>
<span class="sd"> show(init, square)</span>
<span class="sd"> # Below is similar to running `p.run()`. It computes data for both</span>
<span class="sd"> # PCollection `square` and PCollection `cube`, then visualizes them.</span>
<span class="sd"> show(square, cube)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">flatten_pcolls</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">pcoll_container</span> <span class="ow">in</span> <span class="n">pcolls</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pcoll_container</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span>
<span class="n">flatten_pcolls</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">pcoll_container</span><span class="o">.</span><span class="n">values</span><span class="p">())</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pcoll_container</span><span class="p">,</span> <span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">,</span> <span class="n">DeferredBase</span><span class="p">)):</span>
<span class="n">flatten_pcolls</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">pcoll_container</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">flatten_pcolls</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="nb">iter</span><span class="p">(</span><span class="n">pcoll_container</span><span class="p">))</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;The given pcoll </span><span class="si">%s</span><span class="s1"> is not a dict, an iterable or a PCollection.&#39;</span> <span class="o">%</span>
<span class="n">pcoll_container</span><span class="p">)</span>
<span class="c1"># Iterate through the given PCollections and convert any deferred DataFrames</span>
<span class="c1"># or Series into PCollections.</span>
<span class="n">pcolls</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="c1"># The element type is used to help visualize the given PCollection. For the</span>
<span class="c1"># deferred DataFrame/Series case it is the proxy of the frame.</span>
<span class="n">element_types</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">for</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">flatten_pcolls</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">DeferredBase</span><span class="p">):</span>
<span class="n">pcoll</span><span class="p">,</span> <span class="n">element_type</span> <span class="o">=</span> <span class="n">deferred_df_to_pcollection</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)</span>
<span class="n">watch</span><span class="p">({</span><span class="s1">&#39;anonymous_pcollection_</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">id</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)):</span> <span class="n">pcoll</span><span class="p">})</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">element_type</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span>
<span class="n">element_types</span><span class="p">[</span><span class="n">pcoll</span><span class="p">]</span> <span class="o">=</span> <span class="n">element_type</span>
<span class="n">pcolls</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">),</span> <span class="p">(</span>
<span class="s1">&#39;</span><span class="si">{}</span><span class="s1"> is not an apache_beam.pvalue.PCollection.&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">pcoll</span><span class="p">))</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="n">pcolls</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">,</span> <span class="p">(</span>
<span class="s1">&#39;Need at least 1 PCollection to show data visualization.&#39;</span><span class="p">)</span>
<span class="n">pcoll_pipeline</span> <span class="o">=</span> <span class="nb">next</span><span class="p">(</span><span class="nb">iter</span><span class="p">(</span><span class="n">pcolls</span><span class="p">))</span><span class="o">.</span><span class="n">pipeline</span>
<span class="n">user_pipeline</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">user_pipeline</span><span class="p">(</span><span class="n">pcoll_pipeline</span><span class="p">)</span>
<span class="c1"># Possibly showing a PCollection defined in a local scope that is not</span>
<span class="c1"># explicitly watched. Ad hoc watch it though it&#39;s a little late.</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">user_pipeline</span><span class="p">:</span>
<span class="n">watch</span><span class="p">({</span><span class="s1">&#39;anonymous_pipeline_</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">id</span><span class="p">(</span><span class="n">pcoll_pipeline</span><span class="p">)):</span> <span class="n">pcoll_pipeline</span><span class="p">})</span>
<span class="n">user_pipeline</span> <span class="o">=</span> <span class="n">pcoll_pipeline</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">n</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">assert</span> <span class="n">n</span> <span class="o">==</span> <span class="s1">&#39;inf&#39;</span><span class="p">,</span> <span class="p">(</span>
<span class="s1">&#39;Currently only the string </span><span class="se">\&#39;</span><span class="s1">inf</span><span class="se">\&#39;</span><span class="s1"> is supported. This denotes reading &#39;</span>
<span class="s1">&#39;elements until the recording is stopped via a kernel interrupt.&#39;</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">n</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span>
<span class="k">assert</span> <span class="n">n</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">,</span> <span class="s1">&#39;n needs to be positive or the string </span><span class="se">\&#39;</span><span class="s1">inf</span><span class="se">\&#39;</span><span class="s1">&#39;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">duration</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span>
<span class="k">assert</span> <span class="n">duration</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">,</span> <span class="p">(</span><span class="s1">&#39;duration needs to be positive, a duration string, &#39;</span>
<span class="s1">&#39;or the string </span><span class="se">\&#39;</span><span class="s1">inf</span><span class="se">\&#39;</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">n</span> <span class="o">==</span> <span class="s1">&#39;inf&#39;</span><span class="p">:</span>
<span class="n">n</span> <span class="o">=</span> <span class="nb">float</span><span class="p">(</span><span class="s1">&#39;inf&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">duration</span> <span class="o">==</span> <span class="s1">&#39;inf&#39;</span><span class="p">:</span>
<span class="n">duration</span> <span class="o">=</span> <span class="nb">float</span><span class="p">(</span><span class="s1">&#39;inf&#39;</span><span class="p">)</span>
<span class="n">previously_computed_pcolls</span> <span class="o">=</span> <span class="p">{</span>
<span class="n">pcoll</span>
<span class="k">for</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">pcolls</span> <span class="k">if</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">computed_pcollections</span>
<span class="p">}</span>
<span class="k">for</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">previously_computed_pcolls</span><span class="p">:</span>
<span class="n">visualize_computed_pcoll</span><span class="p">(</span>
<span class="n">find_pcoll_name</span><span class="p">(</span><span class="n">pcoll</span><span class="p">),</span>
<span class="n">pcoll</span><span class="p">,</span>
<span class="n">n</span><span class="p">,</span>
<span class="n">duration</span><span class="p">,</span>
<span class="n">include_window_info</span><span class="o">=</span><span class="n">include_window_info</span><span class="p">,</span>
<span class="n">display_facets</span><span class="o">=</span><span class="n">visualize_data</span><span class="p">)</span>
<span class="n">pcolls</span> <span class="o">=</span> <span class="n">pcolls</span> <span class="o">-</span> <span class="n">previously_computed_pcolls</span>
<span class="n">recording_manager</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">get_recording_manager</span><span class="p">(</span>
<span class="n">user_pipeline</span><span class="p">,</span> <span class="n">create_if_absent</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="n">recording</span> <span class="o">=</span> <span class="n">recording_manager</span><span class="o">.</span><span class="n">record</span><span class="p">(</span><span class="n">pcolls</span><span class="p">,</span> <span class="n">max_n</span><span class="o">=</span><span class="n">n</span><span class="p">,</span> <span class="n">max_duration</span><span class="o">=</span><span class="n">duration</span><span class="p">)</span>
<span class="c1"># Catch a KeyboardInterrupt to gracefully cancel the recording and</span>
<span class="c1"># visualizations.</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># If in notebook, static plotting computed pcolls as computation is done.</span>
<span class="k">if</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">is_in_notebook</span><span class="p">:</span>
<span class="k">for</span> <span class="n">stream</span> <span class="ow">in</span> <span class="n">recording</span><span class="o">.</span><span class="n">computed</span><span class="p">()</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="n">visualize</span><span class="p">(</span>
<span class="n">stream</span><span class="p">,</span>
<span class="n">include_window_info</span><span class="o">=</span><span class="n">include_window_info</span><span class="p">,</span>
<span class="n">display_facets</span><span class="o">=</span><span class="n">visualize_data</span><span class="p">,</span>
<span class="n">element_type</span><span class="o">=</span><span class="n">element_types</span><span class="p">[</span><span class="n">stream</span><span class="o">.</span><span class="n">pcoll</span><span class="p">])</span>
<span class="k">elif</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">is_in_ipython</span><span class="p">:</span>
<span class="k">for</span> <span class="n">stream</span> <span class="ow">in</span> <span class="n">recording</span><span class="o">.</span><span class="n">computed</span><span class="p">()</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="n">visualize</span><span class="p">(</span>
<span class="n">stream</span><span class="p">,</span>
<span class="n">include_window_info</span><span class="o">=</span><span class="n">include_window_info</span><span class="p">,</span>
<span class="n">element_type</span><span class="o">=</span><span class="n">element_types</span><span class="p">[</span><span class="n">stream</span><span class="o">.</span><span class="n">pcoll</span><span class="p">])</span>
<span class="k">if</span> <span class="n">recording</span><span class="o">.</span><span class="n">is_computed</span><span class="p">():</span>
<span class="k">return</span>
<span class="c1"># If in notebook, dynamic plotting as computation goes.</span>
<span class="k">if</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">is_in_notebook</span><span class="p">:</span>
<span class="k">for</span> <span class="n">stream</span> <span class="ow">in</span> <span class="n">recording</span><span class="o">.</span><span class="n">uncomputed</span><span class="p">()</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="n">visualize</span><span class="p">(</span>
<span class="n">stream</span><span class="p">,</span>
<span class="n">dynamic_plotting_interval</span><span class="o">=</span><span class="mi">1</span><span class="p">,</span>
<span class="n">include_window_info</span><span class="o">=</span><span class="n">include_window_info</span><span class="p">,</span>
<span class="n">display_facets</span><span class="o">=</span><span class="n">visualize_data</span><span class="p">,</span>
<span class="n">element_type</span><span class="o">=</span><span class="n">element_types</span><span class="p">[</span><span class="n">stream</span><span class="o">.</span><span class="n">pcoll</span><span class="p">])</span>
<span class="c1"># Invoke wait_until_finish to ensure the blocking nature of this API without</span>
<span class="c1"># relying on the run to be blocking.</span>
<span class="n">recording</span><span class="o">.</span><span class="n">wait_until_finish</span><span class="p">()</span>
<span class="c1"># If just in ipython shell, plotting once when the computation is completed.</span>
<span class="k">if</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">is_in_ipython</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">is_in_notebook</span><span class="p">:</span>
<span class="k">for</span> <span class="n">stream</span> <span class="ow">in</span> <span class="n">recording</span><span class="o">.</span><span class="n">computed</span><span class="p">()</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="n">visualize</span><span class="p">(</span><span class="n">stream</span><span class="p">,</span> <span class="n">include_window_info</span><span class="o">=</span><span class="n">include_window_info</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span>
<span class="k">if</span> <span class="n">recording</span><span class="p">:</span>
<span class="n">recording</span><span class="o">.</span><span class="n">cancel</span><span class="p">()</span></div>
<div class="viewcode-block" id="collect"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.collect">[docs]</a><span class="nd">@progress_indicated</span>
<span class="k">def</span> <span class="nf">collect</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">n</span><span class="o">=</span><span class="s1">&#39;inf&#39;</span><span class="p">,</span> <span class="n">duration</span><span class="o">=</span><span class="s1">&#39;inf&#39;</span><span class="p">,</span> <span class="n">include_window_info</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Materializes the elements from a PCollection into a Dataframe.</span>
<span class="sd"> This reads each element from file and reads only the amount that it needs</span>
<span class="sd"> into memory. The user can specify either the max number of elements to read</span>
<span class="sd"> or the maximum duration of elements to read. When a limiter is not supplied,</span>
<span class="sd"> it is assumed to be infinite.</span>
<span class="sd"> Args:</span>
<span class="sd"> n: (optional) max number of elements to visualize. Default &#39;inf&#39;.</span>
<span class="sd"> duration: (optional) max duration of elements to read in integer seconds or</span>
<span class="sd"> a string duration. Default &#39;inf&#39;.</span>
<span class="sd"> include_window_info: (optional) if True, appends the windowing information</span>
<span class="sd"> to each row. Default False.</span>
<span class="sd"> For example::</span>
<span class="sd"> p = beam.Pipeline(InteractiveRunner())</span>
<span class="sd"> init = p | &#39;Init&#39; &gt;&gt; beam.Create(range(10))</span>
<span class="sd"> square = init | &#39;Square&#39; &gt;&gt; beam.Map(lambda x: x * x)</span>
<span class="sd"> # Run the pipeline and bring the PCollection into memory as a Dataframe.</span>
<span class="sd"> in_memory_square = head(square, n=5)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Remember the element type so we can make an informed decision on how to</span>
<span class="c1"># collect the result in elements_to_df.</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">DeferredBase</span><span class="p">):</span>
<span class="c1"># Get the proxy so we can get the output shape of the DataFrame.</span>
<span class="n">pcoll</span><span class="p">,</span> <span class="n">element_type</span> <span class="o">=</span> <span class="n">deferred_df_to_pcollection</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)</span>
<span class="n">watch</span><span class="p">({</span><span class="s1">&#39;anonymous_pcollection_</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">id</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)):</span> <span class="n">pcoll</span><span class="p">})</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">element_type</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">),</span> <span class="p">(</span>
<span class="s1">&#39;</span><span class="si">{}</span><span class="s1"> is not an apache_beam.pvalue.PCollection.&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">pcoll</span><span class="p">))</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">n</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">assert</span> <span class="n">n</span> <span class="o">==</span> <span class="s1">&#39;inf&#39;</span><span class="p">,</span> <span class="p">(</span>
<span class="s1">&#39;Currently only the string </span><span class="se">\&#39;</span><span class="s1">inf</span><span class="se">\&#39;</span><span class="s1"> is supported. This denotes reading &#39;</span>
<span class="s1">&#39;elements until the recording is stopped via a kernel interrupt.&#39;</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">n</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span>
<span class="k">assert</span> <span class="n">n</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">,</span> <span class="s1">&#39;n needs to be positive or the string </span><span class="se">\&#39;</span><span class="s1">inf</span><span class="se">\&#39;</span><span class="s1">&#39;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">duration</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span>
<span class="k">assert</span> <span class="n">duration</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">,</span> <span class="p">(</span><span class="s1">&#39;duration needs to be positive, a duration string, &#39;</span>
<span class="s1">&#39;or the string </span><span class="se">\&#39;</span><span class="s1">inf</span><span class="se">\&#39;</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">n</span> <span class="o">==</span> <span class="s1">&#39;inf&#39;</span><span class="p">:</span>
<span class="n">n</span> <span class="o">=</span> <span class="nb">float</span><span class="p">(</span><span class="s1">&#39;inf&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">duration</span> <span class="o">==</span> <span class="s1">&#39;inf&#39;</span><span class="p">:</span>
<span class="n">duration</span> <span class="o">=</span> <span class="nb">float</span><span class="p">(</span><span class="s1">&#39;inf&#39;</span><span class="p">)</span>
<span class="n">user_pipeline</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">user_pipeline</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span>
<span class="c1"># Possibly collecting a PCollection defined in a local scope that is not</span>
<span class="c1"># explicitly watched. Ad hoc watch it though it&#39;s a little late.</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">user_pipeline</span><span class="p">:</span>
<span class="n">watch</span><span class="p">({</span><span class="s1">&#39;anonymous_pipeline_</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">id</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)):</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="p">})</span>
<span class="n">user_pipeline</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span>
<span class="n">recording_manager</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">get_recording_manager</span><span class="p">(</span>
<span class="n">user_pipeline</span><span class="p">,</span> <span class="n">create_if_absent</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="c1"># If already computed, directly read the stream and return.</span>
<span class="k">if</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">computed_pcollections</span><span class="p">:</span>
<span class="n">pcoll_name</span> <span class="o">=</span> <span class="n">find_pcoll_name</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)</span>
<span class="n">elements</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span>
<span class="n">recording_manager</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="n">pcoll_name</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">,</span> <span class="n">n</span><span class="p">,</span> <span class="n">duration</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">())</span>
<span class="k">return</span> <span class="n">elements_to_df</span><span class="p">(</span>
<span class="n">elements</span><span class="p">,</span>
<span class="n">include_window_info</span><span class="o">=</span><span class="n">include_window_info</span><span class="p">,</span>
<span class="n">element_type</span><span class="o">=</span><span class="n">element_type</span><span class="p">)</span>
<span class="n">recording</span> <span class="o">=</span> <span class="n">recording_manager</span><span class="o">.</span><span class="n">record</span><span class="p">([</span><span class="n">pcoll</span><span class="p">],</span> <span class="n">max_n</span><span class="o">=</span><span class="n">n</span><span class="p">,</span> <span class="n">max_duration</span><span class="o">=</span><span class="n">duration</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">elements</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">recording</span><span class="o">.</span><span class="n">stream</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">())</span>
<span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span>
<span class="n">recording</span><span class="o">.</span><span class="n">cancel</span><span class="p">()</span>
<span class="k">return</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">()</span>
<span class="k">if</span> <span class="n">n</span> <span class="o">==</span> <span class="nb">float</span><span class="p">(</span><span class="s1">&#39;inf&#39;</span><span class="p">):</span>
<span class="n">n</span> <span class="o">=</span> <span class="kc">None</span>
<span class="c1"># Collecting DataFrames may have a length &gt; n, so slice again to be sure. Note</span>
<span class="c1"># that array[:None] returns everything.</span>
<span class="k">return</span> <span class="n">elements_to_df</span><span class="p">(</span>
<span class="n">elements</span><span class="p">,</span>
<span class="n">include_window_info</span><span class="o">=</span><span class="n">include_window_info</span><span class="p">,</span>
<span class="n">element_type</span><span class="o">=</span><span class="n">element_type</span><span class="p">)[:</span><span class="n">n</span><span class="p">]</span></div>
<div class="viewcode-block" id="show_graph"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.show_graph">[docs]</a><span class="nd">@progress_indicated</span>
<span class="k">def</span> <span class="nf">show_graph</span><span class="p">(</span><span class="n">pipeline</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Shows the current pipeline shape of a given Beam pipeline as a DAG.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">pipeline_graph</span><span class="o">.</span><span class="n">PipelineGraph</span><span class="p">(</span><span class="n">pipeline</span><span class="p">)</span><span class="o">.</span><span class="n">display_graph</span><span class="p">()</span></div>
<div class="viewcode-block" id="evict_recorded_data"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.interactive_beam.html#apache_beam.runners.interactive.interactive_beam.evict_recorded_data">[docs]</a><span class="k">def</span> <span class="nf">evict_recorded_data</span><span class="p">(</span><span class="n">pipeline</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Forcefully evicts all recorded replayable data for the given pipeline. If</span>
<span class="sd"> no pipeline is specified, evicts for all user defined pipelines.</span>
<span class="sd"> Once invoked, Interactive Beam will record new data based on the guidance of</span>
<span class="sd"> options the next time it evaluates/visualizes PCollections or runs pipelines.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.options</span> <span class="kn">import</span> <span class="n">capture_control</span>
<span class="n">capture_control</span><span class="o">.</span><span class="n">evict_captured_data</span><span class="p">(</span><span class="n">pipeline</span><span class="p">)</span></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>