blob: c60b4239585f31930278df33485a47ba31cdd066 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.runners.interactive.utils &mdash; Apache Beam 2.47.0 documentation</title>
<script type="text/javascript" src="../../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../../" src="../../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../../genindex.html" />
<link rel="search" title="Search" href="../../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.47.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.runners.interactive.utils</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.runners.interactive.utils</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Utilities to be used in Interactive Beam.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="kn">import</span> <span class="nn">functools</span>
<span class="kn">import</span> <span class="nn">hashlib</span>
<span class="kn">import</span> <span class="nn">importlib</span>
<span class="kn">import</span> <span class="nn">json</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Any</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Dict</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Tuple</span>
<span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span>
<span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span>
<span class="kn">from</span> <span class="nn">apache_beam.dataframe.convert</span> <span class="kn">import</span> <span class="n">to_pcollection</span>
<span class="kn">from</span> <span class="nn">apache_beam.dataframe.frame_base</span> <span class="kn">import</span> <span class="n">DeferredBase</span>
<span class="kn">from</span> <span class="nn">apache_beam.internal.gcp</span> <span class="kn">import</span> <span class="n">auth</span>
<span class="kn">from</span> <span class="nn">apache_beam.internal.http_client</span> <span class="kn">import</span> <span class="n">get_new_http</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.internal.clients</span> <span class="kn">import</span> <span class="n">storage</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="kn">import</span> <span class="n">PipelineOptions</span>
<span class="kn">from</span> <span class="nn">apache_beam.pipeline</span> <span class="kn">import</span> <span class="n">Pipeline</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability.api</span> <span class="kn">import</span> <span class="n">beam_runner_api_pb2</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.caching.cacheable</span> <span class="kn">import</span> <span class="n">Cacheable</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.caching.cacheable</span> <span class="kn">import</span> <span class="n">CacheKey</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.caching.expression_cache</span> <span class="kn">import</span> <span class="n">ExpressionCache</span>
<span class="kn">from</span> <span class="nn">apache_beam.testing.test_stream</span> <span class="kn">import</span> <span class="n">WindowedValueHolder</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints.schemas</span> <span class="kn">import</span> <span class="n">named_fields_from_element_type</span>
<span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>
<span class="c1"># Add line breaks to the IPythonLogHandler&#39;s HTML output.</span>
<span class="n">_INTERACTIVE_LOG_STYLE</span> <span class="o">=</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> &lt;style&gt;</span>
<span class="s2"> div.alert {</span>
<span class="s2"> white-space: pre-line;</span>
<span class="s2"> }</span>
<span class="s2"> &lt;/style&gt;</span>
<span class="s2">&quot;&quot;&quot;</span>
<div class="viewcode-block" id="to_element_list"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.to_element_list">[docs]</a><span class="k">def</span> <span class="nf">to_element_list</span><span class="p">(</span>
<span class="n">reader</span><span class="p">,</span> <span class="c1"># type: Generator[Union[beam_runner_api_pb2.TestStreamPayload.Event, WindowedValueHolder]] # noqa: F821</span>
<span class="n">coder</span><span class="p">,</span> <span class="c1"># type: Coder # noqa: F821</span>
<span class="n">include_window_info</span><span class="p">,</span> <span class="c1"># type: bool</span>
<span class="n">n</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: int</span>
<span class="n">include_time_events</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="c1"># type: bool</span>
<span class="p">):</span>
<span class="c1"># type: (...) -&gt; List[WindowedValue] # noqa: F821</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns an iterator that properly decodes the elements from the reader.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Defining a generator like this makes it easier to limit the count of</span>
<span class="c1"># elements read. Otherwise, the count limit would need to be duplicated.</span>
<span class="k">def</span> <span class="nf">elements</span><span class="p">():</span>
<span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">reader</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">e</span><span class="p">,</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">TestStreamPayload</span><span class="o">.</span><span class="n">Event</span><span class="p">):</span>
<span class="k">if</span> <span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">HasField</span><span class="p">(</span><span class="s1">&#39;watermark_event&#39;</span><span class="p">)</span> <span class="ow">or</span>
<span class="n">e</span><span class="o">.</span><span class="n">HasField</span><span class="p">(</span><span class="s1">&#39;processing_time_event&#39;</span><span class="p">)):</span>
<span class="k">if</span> <span class="n">include_time_events</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">e</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">for</span> <span class="n">tv</span> <span class="ow">in</span> <span class="n">e</span><span class="o">.</span><span class="n">element_event</span><span class="o">.</span><span class="n">elements</span><span class="p">:</span>
<span class="n">decoded</span> <span class="o">=</span> <span class="n">coder</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="n">tv</span><span class="o">.</span><span class="n">encoded_element</span><span class="p">)</span>
<span class="k">yield</span> <span class="p">(</span>
<span class="n">decoded</span><span class="o">.</span><span class="n">windowed_value</span>
<span class="k">if</span> <span class="n">include_window_info</span> <span class="k">else</span> <span class="n">decoded</span><span class="o">.</span><span class="n">windowed_value</span><span class="o">.</span><span class="n">value</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">e</span><span class="p">,</span> <span class="n">WindowedValueHolder</span><span class="p">):</span>
<span class="k">yield</span> <span class="p">(</span>
<span class="n">e</span><span class="o">.</span><span class="n">windowed_value</span> <span class="k">if</span> <span class="n">include_window_info</span> <span class="k">else</span> <span class="n">e</span><span class="o">.</span><span class="n">windowed_value</span><span class="o">.</span><span class="n">value</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">e</span>
<span class="c1"># Because we can yield multiple elements from a single TestStreamFileRecord,</span>
<span class="c1"># we have to limit the count here to ensure that `n` is fulfilled.</span>
<span class="n">count</span> <span class="o">=</span> <span class="mi">0</span>
<span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">elements</span><span class="p">():</span>
<span class="k">if</span> <span class="n">n</span> <span class="ow">and</span> <span class="n">count</span> <span class="o">&gt;=</span> <span class="n">n</span><span class="p">:</span>
<span class="k">break</span>
<span class="k">yield</span> <span class="n">e</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">e</span><span class="p">,</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">TestStreamPayload</span><span class="o">.</span><span class="n">Event</span><span class="p">):</span>
<span class="n">count</span> <span class="o">+=</span> <span class="mi">1</span></div>
<div class="viewcode-block" id="elements_to_df"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.elements_to_df">[docs]</a><span class="k">def</span> <span class="nf">elements_to_df</span><span class="p">(</span><span class="n">elements</span><span class="p">,</span> <span class="n">include_window_info</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">element_type</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="c1"># type: (List[WindowedValue], bool, Any) -&gt; DataFrame # noqa: F821</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Parses the given elements into a Dataframe.</span>
<span class="sd"> If the elements are a list of WindowedValues, then it will break out the</span>
<span class="sd"> elements into their own DataFrame and return it. If include_window_info is</span>
<span class="sd"> True, then it will concatenate the windowing information onto the elements</span>
<span class="sd"> DataFrame.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">columns_names</span> <span class="o">=</span> <span class="p">[</span>
<span class="n">name</span> <span class="k">for</span> <span class="n">name</span><span class="p">,</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">named_fields_from_element_type</span><span class="p">(</span><span class="n">element_type</span><span class="p">)</span>
<span class="p">]</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="n">columns_names</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">rows</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">windowed_info</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">elements</span><span class="p">:</span>
<span class="n">rows</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">value</span><span class="p">)</span>
<span class="k">if</span> <span class="n">include_window_info</span><span class="p">:</span>
<span class="n">windowed_info</span><span class="o">.</span><span class="n">append</span><span class="p">([</span><span class="n">e</span><span class="o">.</span><span class="n">timestamp</span><span class="o">.</span><span class="n">micros</span><span class="p">,</span> <span class="n">e</span><span class="o">.</span><span class="n">windows</span><span class="p">,</span> <span class="n">e</span><span class="o">.</span><span class="n">pane_info</span><span class="p">])</span>
<span class="n">using_dataframes</span> <span class="o">=</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element_type</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span>
<span class="n">using_series</span> <span class="o">=</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element_type</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">)</span>
<span class="k">if</span> <span class="n">using_dataframes</span> <span class="ow">or</span> <span class="n">using_series</span><span class="p">:</span>
<span class="n">rows_df</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">concat</span><span class="p">(</span><span class="n">rows</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">rows_df</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">(</span><span class="n">rows</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="n">columns_names</span><span class="p">)</span>
<span class="k">if</span> <span class="n">include_window_info</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">using_series</span><span class="p">:</span>
<span class="n">windowed_info_df</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">(</span>
<span class="n">windowed_info</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;event_time&#39;</span><span class="p">,</span> <span class="s1">&#39;windows&#39;</span><span class="p">,</span> <span class="s1">&#39;pane_info&#39;</span><span class="p">])</span>
<span class="n">final_df</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">concat</span><span class="p">([</span><span class="n">rows_df</span><span class="p">,</span> <span class="n">windowed_info_df</span><span class="p">],</span> <span class="n">axis</span><span class="o">=</span><span class="mi">1</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">final_df</span> <span class="o">=</span> <span class="n">rows_df</span>
<span class="k">return</span> <span class="n">final_df</span></div>
<div class="viewcode-block" id="register_ipython_log_handler"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.register_ipython_log_handler">[docs]</a><span class="k">def</span> <span class="nf">register_ipython_log_handler</span><span class="p">():</span>
<span class="c1"># type: () -&gt; None</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Adds the IPython handler to a dummy parent logger (named</span>
<span class="sd"> &#39;apache_beam.runners.interactive&#39;) of all interactive modules&#39; loggers so that</span>
<span class="sd"> if is_in_notebook, logging displays the logs as HTML in frontends.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># apache_beam.runners.interactive is not a module, thus this &quot;root&quot; logger is</span>
<span class="c1"># a dummy one created to hold the IPython log handler. When children loggers</span>
<span class="c1"># have propagate as True (by default) and logging level as NOTSET (by default,</span>
<span class="c1"># so the &quot;root&quot; logger&#39;s logging level takes effect), the IPython log handler</span>
<span class="c1"># will be triggered at the &quot;root&quot;&#39;s own logging level. And if a child logger</span>
<span class="c1"># sets its logging level, it can take control back.</span>
<span class="n">interactive_root_logger</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s1">&#39;apache_beam.runners.interactive&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">any</span><span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">h</span><span class="p">,</span> <span class="n">IPythonLogHandler</span><span class="p">)</span>
<span class="k">for</span> <span class="n">h</span> <span class="ow">in</span> <span class="n">interactive_root_logger</span><span class="o">.</span><span class="n">handlers</span><span class="p">):</span>
<span class="k">return</span>
<span class="n">interactive_root_logger</span><span class="o">.</span><span class="n">setLevel</span><span class="p">(</span><span class="n">logging</span><span class="o">.</span><span class="n">INFO</span><span class="p">)</span>
<span class="n">interactive_root_logger</span><span class="o">.</span><span class="n">addHandler</span><span class="p">(</span><span class="n">IPythonLogHandler</span><span class="p">())</span>
<span class="c1"># Disable the propagation so that logs emitted from interactive modules should</span>
<span class="c1"># only be handled by loggers and handlers defined within interactive packages.</span>
<span class="n">interactive_root_logger</span><span class="o">.</span><span class="n">propagate</span> <span class="o">=</span> <span class="kc">False</span></div>
<div class="viewcode-block" id="IPythonLogHandler"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.IPythonLogHandler">[docs]</a><span class="k">class</span> <span class="nc">IPythonLogHandler</span><span class="p">(</span><span class="n">logging</span><span class="o">.</span><span class="n">Handler</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A logging handler to display logs as HTML in IPython backed frontends.&quot;&quot;&quot;</span>
<span class="c1"># TODO(BEAM-7923): Switch to Google hosted CDN once</span>
<span class="c1"># https://code.google.com/archive/p/google-ajax-apis/issues/637 is resolved.</span>
<span class="n">log_template</span> <span class="o">=</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> &lt;link rel=&quot;stylesheet&quot; href=&quot;https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css&quot; integrity=&quot;sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh&quot; crossorigin=&quot;anonymous&quot;&gt;</span>
<span class="s2"> &lt;div class=&quot;alert alert-</span><span class="si">{level}</span><span class="s2">&quot;&gt;</span><span class="si">{msg}</span><span class="s2">&lt;/div&gt;&quot;&quot;&quot;</span>
<span class="n">logging_to_alert_level_map</span> <span class="o">=</span> <span class="p">{</span>
<span class="n">logging</span><span class="o">.</span><span class="n">CRITICAL</span><span class="p">:</span> <span class="s1">&#39;danger&#39;</span><span class="p">,</span>
<span class="n">logging</span><span class="o">.</span><span class="n">ERROR</span><span class="p">:</span> <span class="s1">&#39;danger&#39;</span><span class="p">,</span>
<span class="n">logging</span><span class="o">.</span><span class="n">WARNING</span><span class="p">:</span> <span class="s1">&#39;warning&#39;</span><span class="p">,</span>
<span class="n">logging</span><span class="o">.</span><span class="n">INFO</span><span class="p">:</span> <span class="s1">&#39;info&#39;</span><span class="p">,</span>
<span class="n">logging</span><span class="o">.</span><span class="n">DEBUG</span><span class="p">:</span> <span class="s1">&#39;dark&#39;</span><span class="p">,</span>
<span class="n">logging</span><span class="o">.</span><span class="n">NOTSET</span><span class="p">:</span> <span class="s1">&#39;light&#39;</span>
<span class="p">}</span>
<div class="viewcode-block" id="IPythonLogHandler.emit"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.IPythonLogHandler.emit">[docs]</a> <span class="k">def</span> <span class="nf">emit</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">record</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">html</span> <span class="kn">import</span> <span class="n">escape</span>
<span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">HTML</span>
<span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">display</span>
<span class="n">display</span><span class="p">(</span><span class="n">HTML</span><span class="p">(</span><span class="n">_INTERACTIVE_LOG_STYLE</span><span class="p">))</span>
<span class="n">display</span><span class="p">(</span>
<span class="n">HTML</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log_template</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">level</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">logging_to_alert_level_map</span><span class="p">[</span><span class="n">record</span><span class="o">.</span><span class="n">levelno</span><span class="p">],</span>
<span class="n">msg</span><span class="o">=</span><span class="n">escape</span><span class="p">(</span><span class="n">record</span><span class="o">.</span><span class="n">msg</span> <span class="o">%</span> <span class="n">record</span><span class="o">.</span><span class="n">args</span><span class="p">))))</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="k">pass</span> <span class="c1"># NOOP when dependencies are not available.</span></div></div>
<div class="viewcode-block" id="obfuscate"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.obfuscate">[docs]</a><span class="k">def</span> <span class="nf">obfuscate</span><span class="p">(</span><span class="o">*</span><span class="n">inputs</span><span class="p">):</span>
<span class="c1"># type: (*Any) -&gt; str</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Obfuscates any inputs into a hexadecimal string.&quot;&quot;&quot;</span>
<span class="n">str_inputs</span> <span class="o">=</span> <span class="p">[</span><span class="nb">str</span><span class="p">(</span><span class="nb">input</span><span class="p">)</span> <span class="k">for</span> <span class="nb">input</span> <span class="ow">in</span> <span class="n">inputs</span><span class="p">]</span>
<span class="n">merged_inputs</span> <span class="o">=</span> <span class="s1">&#39;_&#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">str_inputs</span><span class="p">)</span>
<span class="k">return</span> <span class="n">hashlib</span><span class="o">.</span><span class="n">md5</span><span class="p">(</span><span class="n">merged_inputs</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">hexdigest</span><span class="p">()</span></div>
<div class="viewcode-block" id="ProgressIndicator"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.ProgressIndicator">[docs]</a><span class="k">class</span> <span class="nc">ProgressIndicator</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;An indicator visualizing code execution in progress.&quot;&quot;&quot;</span>
<span class="c1"># TODO(BEAM-7923): Switch to Google hosted CDN once</span>
<span class="c1"># https://code.google.com/archive/p/google-ajax-apis/issues/637 is resolved.</span>
<span class="n">spinner_template</span> <span class="o">=</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> &lt;link rel=&quot;stylesheet&quot; href=&quot;https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css&quot; integrity=&quot;sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh&quot; crossorigin=&quot;anonymous&quot;&gt;</span>
<span class="s2"> &lt;div id=&quot;</span><span class="si">{id}</span><span class="s2">&quot;&gt;</span>
<span class="s2"> &lt;div class=&quot;spinner-border text-info&quot; role=&quot;status&quot;&gt;&lt;/div&gt;</span>
<span class="s2"> &lt;span class=&quot;text-info&quot;&gt;</span><span class="si">{text}</span><span class="s2">&lt;/span&gt;</span>
<span class="s2"> &lt;/div&gt;</span>
<span class="s2"> &quot;&quot;&quot;</span>
<span class="n">spinner_removal_template</span> <span class="o">=</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> $(&quot;#</span><span class="si">{id}</span><span class="s2">&quot;).remove();&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">enter_text</span><span class="p">,</span> <span class="n">exit_text</span><span class="p">):</span>
<span class="c1"># type: (str, str) -&gt; None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_id</span> <span class="o">=</span> <span class="s1">&#39;progress_indicator_</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">obfuscate</span><span class="p">(</span><span class="nb">id</span><span class="p">(</span><span class="bp">self</span><span class="p">)))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_enter_text</span> <span class="o">=</span> <span class="n">enter_text</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_exit_text</span> <span class="o">=</span> <span class="n">exit_text</span>
<span class="k">def</span> <span class="fm">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">HTML</span>
<span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">display</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span>
<span class="k">if</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">is_in_notebook</span><span class="p">:</span>
<span class="n">display</span><span class="p">(</span>
<span class="n">HTML</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">spinner_template</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="nb">id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_id</span><span class="p">,</span> <span class="n">text</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_enter_text</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">display</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_enter_text</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">ImportError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">error</span><span class="p">(</span>
<span class="s1">&#39;Please use interactive Beam features in an IPython&#39;</span>
<span class="s1">&#39;or notebook environment: </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">e</span><span class="p">)</span>
<span class="k">def</span> <span class="fm">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exc_type</span><span class="p">,</span> <span class="n">exc_value</span><span class="p">,</span> <span class="n">traceback</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">Javascript</span>
<span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">display</span>
<span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">display_javascript</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span>
<span class="k">if</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">is_in_notebook</span><span class="p">:</span>
<span class="n">script</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">spinner_removal_template</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_id</span><span class="p">)</span>
<span class="n">display_javascript</span><span class="p">(</span>
<span class="n">Javascript</span><span class="p">(</span>
<span class="n">ie</span><span class="o">.</span><span class="n">_JQUERY_WITH_DATATABLE_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">customized_script</span><span class="o">=</span><span class="n">script</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">display</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_exit_text</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">ImportError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">error</span><span class="p">(</span>
<span class="s1">&#39;Please use interactive Beam features in an IPython&#39;</span>
<span class="s1">&#39;or notebook environment: </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">e</span><span class="p">)</span></div>
<div class="viewcode-block" id="progress_indicated"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.progress_indicated">[docs]</a><span class="k">def</span> <span class="nf">progress_indicated</span><span class="p">(</span><span class="n">func</span><span class="p">):</span>
<span class="c1"># type: (Callable[..., Any]) -&gt; Callable[..., Any] # noqa: F821</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A decorator using a unique progress indicator as a context manager to</span>
<span class="sd"> execute the given function within.&quot;&quot;&quot;</span>
<span class="nd">@functools</span><span class="o">.</span><span class="n">wraps</span><span class="p">(</span><span class="n">func</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">run_within_progress_indicator</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">with</span> <span class="n">ProgressIndicator</span><span class="p">(</span><span class="sa">f</span><span class="s1">&#39;Processing... </span><span class="si">{</span><span class="n">func</span><span class="o">.</span><span class="vm">__name__</span><span class="si">}</span><span class="s1">&#39;</span><span class="p">,</span> <span class="s1">&#39;Done.&#39;</span><span class="p">):</span>
<span class="k">return</span> <span class="n">func</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">return</span> <span class="n">run_within_progress_indicator</span></div>
<div class="viewcode-block" id="as_json"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.as_json">[docs]</a><span class="k">def</span> <span class="nf">as_json</span><span class="p">(</span><span class="n">func</span><span class="p">):</span>
<span class="c1"># type: (Callable[..., Any]) -&gt; Callable[..., str] # noqa: F821</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A decorator convert python objects returned by callables to json</span>
<span class="sd"> string.</span>
<span class="sd"> The decorated function should always return an object parsable by json.dumps.</span>
<span class="sd"> If the object is not parsable, the str() of original object is returned</span>
<span class="sd"> instead.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">return_as_json</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">return_value</span> <span class="o">=</span> <span class="n">func</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">return</span> <span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">return_value</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="n">return_value</span><span class="p">)</span>
<span class="k">return</span> <span class="n">return_as_json</span></div>
<div class="viewcode-block" id="deferred_df_to_pcollection"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.deferred_df_to_pcollection">[docs]</a><span class="k">def</span> <span class="nf">deferred_df_to_pcollection</span><span class="p">(</span><span class="n">df</span><span class="p">):</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">DeferredBase</span><span class="p">),</span> <span class="s1">&#39;</span><span class="si">{}</span><span class="s1"> is not a DeferredBase&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">df</span><span class="p">)</span>
<span class="c1"># The proxy is used to output a DataFrame with the correct columns.</span>
<span class="c1">#</span>
<span class="c1"># TODO(https://github.com/apache/beam/issues/20577): Once type hints are</span>
<span class="c1"># implemented for pandas, use those instead of the proxy.</span>
<span class="n">cache</span> <span class="o">=</span> <span class="n">ExpressionCache</span><span class="p">()</span>
<span class="n">cache</span><span class="o">.</span><span class="n">replace_with_cached</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">_expr</span><span class="p">)</span>
<span class="n">proxy</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">_expr</span><span class="o">.</span><span class="n">proxy</span><span class="p">()</span>
<span class="k">return</span> <span class="n">to_pcollection</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">yield_elements</span><span class="o">=</span><span class="s1">&#39;pandas&#39;</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="nb">str</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">_expr</span><span class="p">)),</span> <span class="n">proxy</span></div>
<div class="viewcode-block" id="pcoll_by_name"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.pcoll_by_name">[docs]</a><span class="k">def</span> <span class="nf">pcoll_by_name</span><span class="p">()</span> <span class="o">-&gt;</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Finds all PCollections by their variable names defined in the notebook.&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span>
<span class="n">inspectables</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">inspector_with_synthetic</span><span class="o">.</span><span class="n">inspectables</span>
<span class="n">pcolls</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">inspectable</span> <span class="ow">in</span> <span class="n">inspectables</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="n">metadata</span> <span class="o">=</span> <span class="n">inspectable</span><span class="p">[</span><span class="s1">&#39;metadata&#39;</span><span class="p">]</span>
<span class="k">if</span> <span class="n">metadata</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;pcollection&#39;</span><span class="p">:</span>
<span class="n">pcolls</span><span class="p">[</span><span class="n">metadata</span><span class="p">[</span><span class="s1">&#39;name&#39;</span><span class="p">]]</span> <span class="o">=</span> <span class="n">inspectable</span><span class="p">[</span><span class="s1">&#39;value&#39;</span><span class="p">]</span>
<span class="k">return</span> <span class="n">pcolls</span></div>
<div class="viewcode-block" id="find_pcoll_name"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.find_pcoll_name">[docs]</a><span class="k">def</span> <span class="nf">find_pcoll_name</span><span class="p">(</span><span class="n">pcoll</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">str</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Finds the variable name of a PCollection defined by the user.</span>
<span class="sd"> Returns None if not assigned to any variable.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span>
<span class="n">inspectables</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">inspector</span><span class="o">.</span><span class="n">inspectables</span>
<span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">inspectable</span> <span class="ow">in</span> <span class="n">inspectables</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="k">if</span> <span class="n">inspectable</span><span class="p">[</span><span class="s1">&#39;value&#39;</span><span class="p">]</span> <span class="ow">is</span> <span class="n">pcoll</span><span class="p">:</span>
<span class="k">return</span> <span class="n">inspectable</span><span class="p">[</span><span class="s1">&#39;metadata&#39;</span><span class="p">][</span><span class="s1">&#39;name&#39;</span><span class="p">]</span>
<span class="k">return</span> <span class="kc">None</span></div>
<div class="viewcode-block" id="cacheables"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.cacheables">[docs]</a><span class="k">def</span> <span class="nf">cacheables</span><span class="p">()</span> <span class="o">-&gt;</span> <span class="n">Dict</span><span class="p">[</span><span class="n">CacheKey</span><span class="p">,</span> <span class="n">Cacheable</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Finds all Cacheables with their CacheKeys.&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span>
<span class="n">inspectables</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">inspector_with_synthetic</span><span class="o">.</span><span class="n">inspectables</span>
<span class="n">cacheables</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">inspectable</span> <span class="ow">in</span> <span class="n">inspectables</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="n">metadata</span> <span class="o">=</span> <span class="n">inspectable</span><span class="p">[</span><span class="s1">&#39;metadata&#39;</span><span class="p">]</span>
<span class="k">if</span> <span class="n">metadata</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;pcollection&#39;</span><span class="p">:</span>
<span class="n">cacheable</span> <span class="o">=</span> <span class="n">Cacheable</span><span class="o">.</span><span class="n">from_pcoll</span><span class="p">(</span><span class="n">metadata</span><span class="p">[</span><span class="s1">&#39;name&#39;</span><span class="p">],</span> <span class="n">inspectable</span><span class="p">[</span><span class="s1">&#39;value&#39;</span><span class="p">])</span>
<span class="n">cacheables</span><span class="p">[</span><span class="n">cacheable</span><span class="o">.</span><span class="n">to_key</span><span class="p">()]</span> <span class="o">=</span> <span class="n">cacheable</span>
<span class="k">return</span> <span class="n">cacheables</span></div>
<div class="viewcode-block" id="watch_sources"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.watch_sources">[docs]</a><span class="k">def</span> <span class="nf">watch_sources</span><span class="p">(</span><span class="n">pipeline</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Watches the unbounded sources in the pipeline.</span>
<span class="sd"> Sources can output to a PCollection without a user variable reference. In</span>
<span class="sd"> this case the source is not cached. We still want to cache the data so we</span>
<span class="sd"> synthetically create a variable to the intermediate PCollection.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">apache_beam.pipeline</span> <span class="kn">import</span> <span class="n">PipelineVisitor</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span>
<span class="n">retrieved_user_pipeline</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">user_pipeline</span><span class="p">(</span><span class="n">pipeline</span><span class="p">)</span>
<span class="n">pcoll_to_name</span> <span class="o">=</span> <span class="p">{</span><span class="n">v</span><span class="p">:</span> <span class="n">k</span> <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">pcoll_by_name</span><span class="p">()</span><span class="o">.</span><span class="n">items</span><span class="p">()}</span>
<span class="k">class</span> <span class="nc">CacheableUnboundedPCollectionVisitor</span><span class="p">(</span><span class="n">PipelineVisitor</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">unbounded_pcolls</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">enter_composite_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">visit_transform</span><span class="p">(</span><span class="n">transform_node</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">visit_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">transform_node</span><span class="o">.</span><span class="n">transform</span><span class="p">,</span>
<span class="nb">tuple</span><span class="p">(</span><span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">recordable_sources</span><span class="p">)):</span>
<span class="k">for</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">outputs</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="c1"># Only generate a synthetic var when it&#39;s not already watched. For</span>
<span class="c1"># example, the user could have assigned the unbounded source output</span>
<span class="c1"># to a variable, watching it again with a different variable name</span>
<span class="c1"># creates ambiguity.</span>
<span class="k">if</span> <span class="n">pcoll</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">pcoll_to_name</span><span class="p">:</span>
<span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">watch</span><span class="p">({</span><span class="s1">&#39;synthetic_var_&#39;</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="nb">id</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)):</span> <span class="n">pcoll</span><span class="p">})</span>
<span class="n">retrieved_user_pipeline</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">CacheableUnboundedPCollectionVisitor</span><span class="p">())</span></div>
<div class="viewcode-block" id="has_unbounded_sources"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.has_unbounded_sources">[docs]</a><span class="k">def</span> <span class="nf">has_unbounded_sources</span><span class="p">(</span><span class="n">pipeline</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Checks if a given pipeline has recordable sources.&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="n">unbounded_sources</span><span class="p">(</span><span class="n">pipeline</span><span class="p">))</span> <span class="o">&gt;</span> <span class="mi">0</span></div>
<div class="viewcode-block" id="unbounded_sources"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.unbounded_sources">[docs]</a><span class="k">def</span> <span class="nf">unbounded_sources</span><span class="p">(</span><span class="n">pipeline</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns a pipeline&#39;s recordable sources.&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">apache_beam.pipeline</span> <span class="kn">import</span> <span class="n">PipelineVisitor</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span>
<span class="k">class</span> <span class="nc">CheckUnboundednessVisitor</span><span class="p">(</span><span class="n">PipelineVisitor</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Visitor checks if there are any unbounded read sources in the Pipeline.</span>
<span class="sd"> Visitor visits all nodes and checks if it is an instance of recordable</span>
<span class="sd"> sources.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">unbounded_sources</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">def</span> <span class="nf">enter_composite_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">visit_transform</span><span class="p">(</span><span class="n">transform_node</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">visit_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">transform_node</span><span class="o">.</span><span class="n">transform</span><span class="p">,</span>
<span class="nb">tuple</span><span class="p">(</span><span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">recordable_sources</span><span class="p">)):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">unbounded_sources</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">transform_node</span><span class="p">)</span>
<span class="n">v</span> <span class="o">=</span> <span class="n">CheckUnboundednessVisitor</span><span class="p">()</span>
<span class="n">pipeline</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">v</span><span class="p">)</span>
<span class="k">return</span> <span class="n">v</span><span class="o">.</span><span class="n">unbounded_sources</span></div>
<div class="viewcode-block" id="create_var_in_main"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.create_var_in_main">[docs]</a><span class="k">def</span> <span class="nf">create_var_in_main</span><span class="p">(</span><span class="n">name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">value</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span>
<span class="n">watch</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">True</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Declares a variable in the main module.</span>
<span class="sd"> Args:</span>
<span class="sd"> name: the variable name in the main module.</span>
<span class="sd"> value: the value of the variable.</span>
<span class="sd"> watch: whether to watch it in the interactive environment.</span>
<span class="sd"> Returns:</span>
<span class="sd"> A 2-entry tuple of the variable name and value.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nb">setattr</span><span class="p">(</span><span class="n">importlib</span><span class="o">.</span><span class="n">import_module</span><span class="p">(</span><span class="s1">&#39;__main__&#39;</span><span class="p">),</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span>
<span class="k">if</span> <span class="n">watch</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span>
<span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">watch</span><span class="p">({</span><span class="n">name</span><span class="p">:</span> <span class="n">value</span><span class="p">})</span>
<span class="k">return</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span></div>
<div class="viewcode-block" id="assert_bucket_exists"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.assert_bucket_exists">[docs]</a><span class="k">def</span> <span class="nf">assert_bucket_exists</span><span class="p">(</span><span class="n">bucket_name</span><span class="p">):</span>
<span class="c1"># type: (str) -&gt; None</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Asserts whether the specified GCS bucket with the name</span>
<span class="sd"> bucket_name exists.</span>
<span class="sd"> Logs an error and raises a ValueError if the bucket does not exist.</span>
<span class="sd"> Logs a warning if the bucket cannot be verified to exist.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">apitools.base.py.exceptions</span> <span class="kn">import</span> <span class="n">HttpError</span>
<span class="n">storage_client</span> <span class="o">=</span> <span class="n">storage</span><span class="o">.</span><span class="n">StorageV1</span><span class="p">(</span>
<span class="n">credentials</span><span class="o">=</span><span class="n">auth</span><span class="o">.</span><span class="n">get_service_credentials</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">()),</span>
<span class="n">get_credentials</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">http</span><span class="o">=</span><span class="n">get_new_http</span><span class="p">(),</span>
<span class="n">response_encoding</span><span class="o">=</span><span class="s1">&#39;utf8&#39;</span><span class="p">)</span>
<span class="n">request</span> <span class="o">=</span> <span class="n">storage</span><span class="o">.</span><span class="n">StorageBucketsGetRequest</span><span class="p">(</span><span class="n">bucket</span><span class="o">=</span><span class="n">bucket_name</span><span class="p">)</span>
<span class="n">storage_client</span><span class="o">.</span><span class="n">buckets</span><span class="o">.</span><span class="n">Get</span><span class="p">(</span><span class="n">request</span><span class="p">)</span>
<span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="k">if</span> <span class="n">e</span><span class="o">.</span><span class="n">status_code</span> <span class="o">==</span> <span class="mi">404</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s1">&#39;</span><span class="si">%s</span><span class="s1"> bucket does not exist!&#39;</span><span class="p">,</span> <span class="n">bucket_name</span><span class="p">)</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Invalid GCS bucket provided!&#39;</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;HttpError - unable to verify whether bucket </span><span class="si">%s</span><span class="s1"> exists&#39;</span><span class="p">,</span> <span class="n">bucket_name</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;ImportError - unable to verify whether bucket </span><span class="si">%s</span><span class="s1"> exists&#39;</span><span class="p">,</span> <span class="n">bucket_name</span><span class="p">)</span></div>
<div class="viewcode-block" id="detect_pipeline_runner"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.detect_pipeline_runner">[docs]</a><span class="k">def</span> <span class="nf">detect_pipeline_runner</span><span class="p">(</span><span class="n">pipeline</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pipeline</span><span class="p">,</span> <span class="n">Pipeline</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.interactive_runner</span> <span class="kn">import</span> <span class="n">InteractiveRunner</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pipeline</span><span class="o">.</span><span class="n">runner</span><span class="p">,</span> <span class="n">InteractiveRunner</span><span class="p">):</span>
<span class="n">pipeline_runner</span> <span class="o">=</span> <span class="n">pipeline</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">_underlying_runner</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">pipeline_runner</span> <span class="o">=</span> <span class="n">pipeline</span><span class="o">.</span><span class="n">runner</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">pipeline_runner</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">return</span> <span class="n">pipeline_runner</span></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>