| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>apache_beam.runners.interactive.utils — Apache Beam 2.47.0 documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../../" src="../../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../../_static/language_data.js"></script> |
| <script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| |
| <script type="text/javascript" src="../../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../../_static/pygments.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../../search.html" /> |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../../index.html" class="icon icon-home"> Apache Beam |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 2.47.0 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.ml.html">apache_beam.ml package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.testing.html">apache_beam.testing package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.yaml.html">apache_beam.yaml package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../../index.html">Apache Beam</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../../index.html">Module code</a> »</li> |
| |
| <li>apache_beam.runners.interactive.utils</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.runners.interactive.utils</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="sd">"""Utilities to be used in Interactive Beam.</span> |
| <span class="sd">"""</span> |
| |
| <span class="kn">import</span> <span class="nn">functools</span> |
| <span class="kn">import</span> <span class="nn">hashlib</span> |
| <span class="kn">import</span> <span class="nn">importlib</span> |
| <span class="kn">import</span> <span class="nn">json</span> |
| <span class="kn">import</span> <span class="nn">logging</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Any</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Dict</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Tuple</span> |
| |
| <span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span> |
| |
| <span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.dataframe.convert</span> <span class="kn">import</span> <span class="n">to_pcollection</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.dataframe.frame_base</span> <span class="kn">import</span> <span class="n">DeferredBase</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.internal.gcp</span> <span class="kn">import</span> <span class="n">auth</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.internal.http_client</span> <span class="kn">import</span> <span class="n">get_new_http</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp.internal.clients</span> <span class="kn">import</span> <span class="n">storage</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="kn">import</span> <span class="n">PipelineOptions</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.pipeline</span> <span class="kn">import</span> <span class="n">Pipeline</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.portability.api</span> <span class="kn">import</span> <span class="n">beam_runner_api_pb2</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.caching.cacheable</span> <span class="kn">import</span> <span class="n">Cacheable</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.caching.cacheable</span> <span class="kn">import</span> <span class="n">CacheKey</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.caching.expression_cache</span> <span class="kn">import</span> <span class="n">ExpressionCache</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.testing.test_stream</span> <span class="kn">import</span> <span class="n">WindowedValueHolder</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.typehints.schemas</span> <span class="kn">import</span> <span class="n">named_fields_from_element_type</span> |
| |
| <span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span> |
| |
| <span class="c1"># Add line breaks to the IPythonLogHandler's HTML output.</span> |
| <span class="n">_INTERACTIVE_LOG_STYLE</span> <span class="o">=</span> <span class="s2">"""</span> |
| <span class="s2"> <style></span> |
| <span class="s2"> div.alert {</span> |
| <span class="s2"> white-space: pre-line;</span> |
| <span class="s2"> }</span> |
| <span class="s2"> </style></span> |
| <span class="s2">"""</span> |
| |
| |
| <div class="viewcode-block" id="to_element_list"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.to_element_list">[docs]</a><span class="k">def</span> <span class="nf">to_element_list</span><span class="p">(</span> |
| <span class="n">reader</span><span class="p">,</span> <span class="c1"># type: Generator[Union[beam_runner_api_pb2.TestStreamPayload.Event, WindowedValueHolder]] # noqa: F821</span> |
| <span class="n">coder</span><span class="p">,</span> <span class="c1"># type: Coder # noqa: F821</span> |
| <span class="n">include_window_info</span><span class="p">,</span> <span class="c1"># type: bool</span> |
| <span class="n">n</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: int</span> |
| <span class="n">include_time_events</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="c1"># type: bool</span> |
| <span class="p">):</span> |
| <span class="c1"># type: (...) -> List[WindowedValue] # noqa: F821</span> |
| |
| <span class="w"> </span><span class="sd">"""Returns an iterator that properly decodes the elements from the reader.</span> |
| <span class="sd"> """</span> |
| |
| <span class="c1"># Defining a generator like this makes it easier to limit the count of</span> |
| <span class="c1"># elements read. Otherwise, the count limit would need to be duplicated.</span> |
| <span class="k">def</span> <span class="nf">elements</span><span class="p">():</span> |
| <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">reader</span><span class="p">:</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">e</span><span class="p">,</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">TestStreamPayload</span><span class="o">.</span><span class="n">Event</span><span class="p">):</span> |
| <span class="k">if</span> <span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">HasField</span><span class="p">(</span><span class="s1">'watermark_event'</span><span class="p">)</span> <span class="ow">or</span> |
| <span class="n">e</span><span class="o">.</span><span class="n">HasField</span><span class="p">(</span><span class="s1">'processing_time_event'</span><span class="p">)):</span> |
| <span class="k">if</span> <span class="n">include_time_events</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">e</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">tv</span> <span class="ow">in</span> <span class="n">e</span><span class="o">.</span><span class="n">element_event</span><span class="o">.</span><span class="n">elements</span><span class="p">:</span> |
| <span class="n">decoded</span> <span class="o">=</span> <span class="n">coder</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="n">tv</span><span class="o">.</span><span class="n">encoded_element</span><span class="p">)</span> |
| <span class="k">yield</span> <span class="p">(</span> |
| <span class="n">decoded</span><span class="o">.</span><span class="n">windowed_value</span> |
| <span class="k">if</span> <span class="n">include_window_info</span> <span class="k">else</span> <span class="n">decoded</span><span class="o">.</span><span class="n">windowed_value</span><span class="o">.</span><span class="n">value</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">e</span><span class="p">,</span> <span class="n">WindowedValueHolder</span><span class="p">):</span> |
| <span class="k">yield</span> <span class="p">(</span> |
| <span class="n">e</span><span class="o">.</span><span class="n">windowed_value</span> <span class="k">if</span> <span class="n">include_window_info</span> <span class="k">else</span> <span class="n">e</span><span class="o">.</span><span class="n">windowed_value</span><span class="o">.</span><span class="n">value</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">e</span> |
| |
| <span class="c1"># Because we can yield multiple elements from a single TestStreamFileRecord,</span> |
| <span class="c1"># we have to limit the count here to ensure that `n` is fulfilled.</span> |
| <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">elements</span><span class="p">():</span> |
| <span class="k">if</span> <span class="n">n</span> <span class="ow">and</span> <span class="n">count</span> <span class="o">>=</span> <span class="n">n</span><span class="p">:</span> |
| <span class="k">break</span> |
| |
| <span class="k">yield</span> <span class="n">e</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">e</span><span class="p">,</span> <span class="n">beam_runner_api_pb2</span><span class="o">.</span><span class="n">TestStreamPayload</span><span class="o">.</span><span class="n">Event</span><span class="p">):</span> |
| <span class="n">count</span> <span class="o">+=</span> <span class="mi">1</span></div> |
| |
| |
| <div class="viewcode-block" id="elements_to_df"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.elements_to_df">[docs]</a><span class="k">def</span> <span class="nf">elements_to_df</span><span class="p">(</span><span class="n">elements</span><span class="p">,</span> <span class="n">include_window_info</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">element_type</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="c1"># type: (List[WindowedValue], bool, Any) -> DataFrame # noqa: F821</span> |
| |
| <span class="w"> </span><span class="sd">"""Parses the given elements into a Dataframe.</span> |
| |
| <span class="sd"> If the elements are a list of WindowedValues, then it will break out the</span> |
| <span class="sd"> elements into their own DataFrame and return it. If include_window_info is</span> |
| <span class="sd"> True, then it will concatenate the windowing information onto the elements</span> |
| <span class="sd"> DataFrame.</span> |
| <span class="sd"> """</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">columns_names</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="n">name</span> <span class="k">for</span> <span class="n">name</span><span class="p">,</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">named_fields_from_element_type</span><span class="p">(</span><span class="n">element_type</span><span class="p">)</span> |
| <span class="p">]</span> |
| <span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span> |
| <span class="n">columns_names</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="n">rows</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="n">windowed_info</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">elements</span><span class="p">:</span> |
| <span class="n">rows</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">value</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">include_window_info</span><span class="p">:</span> |
| <span class="n">windowed_info</span><span class="o">.</span><span class="n">append</span><span class="p">([</span><span class="n">e</span><span class="o">.</span><span class="n">timestamp</span><span class="o">.</span><span class="n">micros</span><span class="p">,</span> <span class="n">e</span><span class="o">.</span><span class="n">windows</span><span class="p">,</span> <span class="n">e</span><span class="o">.</span><span class="n">pane_info</span><span class="p">])</span> |
| |
| <span class="n">using_dataframes</span> <span class="o">=</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element_type</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span> |
| <span class="n">using_series</span> <span class="o">=</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">element_type</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">using_dataframes</span> <span class="ow">or</span> <span class="n">using_series</span><span class="p">:</span> |
| <span class="n">rows_df</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">concat</span><span class="p">(</span><span class="n">rows</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">rows_df</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">(</span><span class="n">rows</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="n">columns_names</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">include_window_info</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">using_series</span><span class="p">:</span> |
| <span class="n">windowed_info_df</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">(</span> |
| <span class="n">windowed_info</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s1">'event_time'</span><span class="p">,</span> <span class="s1">'windows'</span><span class="p">,</span> <span class="s1">'pane_info'</span><span class="p">])</span> |
| <span class="n">final_df</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">concat</span><span class="p">([</span><span class="n">rows_df</span><span class="p">,</span> <span class="n">windowed_info_df</span><span class="p">],</span> <span class="n">axis</span><span class="o">=</span><span class="mi">1</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">final_df</span> <span class="o">=</span> <span class="n">rows_df</span> |
| |
| <span class="k">return</span> <span class="n">final_df</span></div> |
| |
| |
| <div class="viewcode-block" id="register_ipython_log_handler"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.register_ipython_log_handler">[docs]</a><span class="k">def</span> <span class="nf">register_ipython_log_handler</span><span class="p">():</span> |
| <span class="c1"># type: () -> None</span> |
| |
| <span class="w"> </span><span class="sd">"""Adds the IPython handler to a dummy parent logger (named</span> |
| <span class="sd"> 'apache_beam.runners.interactive') of all interactive modules' loggers so that</span> |
| <span class="sd"> if is_in_notebook, logging displays the logs as HTML in frontends.</span> |
| <span class="sd"> """</span> |
| |
| <span class="c1"># apache_beam.runners.interactive is not a module, thus this "root" logger is</span> |
| <span class="c1"># a dummy one created to hold the IPython log handler. When children loggers</span> |
| <span class="c1"># have propagate as True (by default) and logging level as NOTSET (by default,</span> |
| <span class="c1"># so the "root" logger's logging level takes effect), the IPython log handler</span> |
| <span class="c1"># will be triggered at the "root"'s own logging level. And if a child logger</span> |
| <span class="c1"># sets its logging level, it can take control back.</span> |
| <span class="n">interactive_root_logger</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s1">'apache_beam.runners.interactive'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="nb">any</span><span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">h</span><span class="p">,</span> <span class="n">IPythonLogHandler</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">h</span> <span class="ow">in</span> <span class="n">interactive_root_logger</span><span class="o">.</span><span class="n">handlers</span><span class="p">):</span> |
| <span class="k">return</span> |
| <span class="n">interactive_root_logger</span><span class="o">.</span><span class="n">setLevel</span><span class="p">(</span><span class="n">logging</span><span class="o">.</span><span class="n">INFO</span><span class="p">)</span> |
| <span class="n">interactive_root_logger</span><span class="o">.</span><span class="n">addHandler</span><span class="p">(</span><span class="n">IPythonLogHandler</span><span class="p">())</span> |
| <span class="c1"># Disable the propagation so that logs emitted from interactive modules should</span> |
| <span class="c1"># only be handled by loggers and handlers defined within interactive packages.</span> |
| <span class="n">interactive_root_logger</span><span class="o">.</span><span class="n">propagate</span> <span class="o">=</span> <span class="kc">False</span></div> |
| |
| |
| <div class="viewcode-block" id="IPythonLogHandler"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.IPythonLogHandler">[docs]</a><span class="k">class</span> <span class="nc">IPythonLogHandler</span><span class="p">(</span><span class="n">logging</span><span class="o">.</span><span class="n">Handler</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A logging handler to display logs as HTML in IPython backed frontends."""</span> |
| <span class="c1"># TODO(BEAM-7923): Switch to Google hosted CDN once</span> |
| <span class="c1"># https://code.google.com/archive/p/google-ajax-apis/issues/637 is resolved.</span> |
| <span class="n">log_template</span> <span class="o">=</span> <span class="s2">"""</span> |
| <span class="s2"> <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous"></span> |
| <span class="s2"> <div class="alert alert-</span><span class="si">{level}</span><span class="s2">"></span><span class="si">{msg}</span><span class="s2"></div>"""</span> |
| |
| <span class="n">logging_to_alert_level_map</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">CRITICAL</span><span class="p">:</span> <span class="s1">'danger'</span><span class="p">,</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">ERROR</span><span class="p">:</span> <span class="s1">'danger'</span><span class="p">,</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">WARNING</span><span class="p">:</span> <span class="s1">'warning'</span><span class="p">,</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">INFO</span><span class="p">:</span> <span class="s1">'info'</span><span class="p">,</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">DEBUG</span><span class="p">:</span> <span class="s1">'dark'</span><span class="p">,</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">NOTSET</span><span class="p">:</span> <span class="s1">'light'</span> |
| <span class="p">}</span> |
| |
| <div class="viewcode-block" id="IPythonLogHandler.emit"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.IPythonLogHandler.emit">[docs]</a> <span class="k">def</span> <span class="nf">emit</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">record</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">html</span> <span class="kn">import</span> <span class="n">escape</span> |
| <span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">HTML</span> |
| <span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">display</span> |
| <span class="n">display</span><span class="p">(</span><span class="n">HTML</span><span class="p">(</span><span class="n">_INTERACTIVE_LOG_STYLE</span><span class="p">))</span> |
| <span class="n">display</span><span class="p">(</span> |
| <span class="n">HTML</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log_template</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">level</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">logging_to_alert_level_map</span><span class="p">[</span><span class="n">record</span><span class="o">.</span><span class="n">levelno</span><span class="p">],</span> |
| <span class="n">msg</span><span class="o">=</span><span class="n">escape</span><span class="p">(</span><span class="n">record</span><span class="o">.</span><span class="n">msg</span> <span class="o">%</span> <span class="n">record</span><span class="o">.</span><span class="n">args</span><span class="p">))))</span> |
| <span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span> |
| <span class="k">pass</span> <span class="c1"># NOOP when dependencies are not available.</span></div></div> |
| |
| |
| <div class="viewcode-block" id="obfuscate"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.obfuscate">[docs]</a><span class="k">def</span> <span class="nf">obfuscate</span><span class="p">(</span><span class="o">*</span><span class="n">inputs</span><span class="p">):</span> |
| <span class="c1"># type: (*Any) -> str</span> |
| |
| <span class="w"> </span><span class="sd">"""Obfuscates any inputs into a hexadecimal string."""</span> |
| <span class="n">str_inputs</span> <span class="o">=</span> <span class="p">[</span><span class="nb">str</span><span class="p">(</span><span class="nb">input</span><span class="p">)</span> <span class="k">for</span> <span class="nb">input</span> <span class="ow">in</span> <span class="n">inputs</span><span class="p">]</span> |
| <span class="n">merged_inputs</span> <span class="o">=</span> <span class="s1">'_'</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">str_inputs</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">hashlib</span><span class="o">.</span><span class="n">md5</span><span class="p">(</span><span class="n">merged_inputs</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">))</span><span class="o">.</span><span class="n">hexdigest</span><span class="p">()</span></div> |
| |
| |
| <div class="viewcode-block" id="ProgressIndicator"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.ProgressIndicator">[docs]</a><span class="k">class</span> <span class="nc">ProgressIndicator</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""An indicator visualizing code execution in progress."""</span> |
| <span class="c1"># TODO(BEAM-7923): Switch to Google hosted CDN once</span> |
| <span class="c1"># https://code.google.com/archive/p/google-ajax-apis/issues/637 is resolved.</span> |
| <span class="n">spinner_template</span> <span class="o">=</span> <span class="s2">"""</span> |
| <span class="s2"> <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous"></span> |
| <span class="s2"> <div id="</span><span class="si">{id}</span><span class="s2">"></span> |
| <span class="s2"> <div class="spinner-border text-info" role="status"></div></span> |
| <span class="s2"> <span class="text-info"></span><span class="si">{text}</span><span class="s2"></span></span> |
| <span class="s2"> </div></span> |
| <span class="s2"> """</span> |
| <span class="n">spinner_removal_template</span> <span class="o">=</span> <span class="s2">"""</span> |
| <span class="s2"> $("#</span><span class="si">{id}</span><span class="s2">").remove();"""</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">enter_text</span><span class="p">,</span> <span class="n">exit_text</span><span class="p">):</span> |
| <span class="c1"># type: (str, str) -> None</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_id</span> <span class="o">=</span> <span class="s1">'progress_indicator_</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">obfuscate</span><span class="p">(</span><span class="nb">id</span><span class="p">(</span><span class="bp">self</span><span class="p">)))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_enter_text</span> <span class="o">=</span> <span class="n">enter_text</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_exit_text</span> <span class="o">=</span> <span class="n">exit_text</span> |
| |
| <span class="k">def</span> <span class="fm">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">HTML</span> |
| <span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">display</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span> |
| <span class="k">if</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">is_in_notebook</span><span class="p">:</span> |
| <span class="n">display</span><span class="p">(</span> |
| <span class="n">HTML</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">spinner_template</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="nb">id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_id</span><span class="p">,</span> <span class="n">text</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_enter_text</span><span class="p">)))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">display</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_enter_text</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">ImportError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">error</span><span class="p">(</span> |
| <span class="s1">'Please use interactive Beam features in an IPython'</span> |
| <span class="s1">'or notebook environment: </span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="n">e</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="fm">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exc_type</span><span class="p">,</span> <span class="n">exc_value</span><span class="p">,</span> <span class="n">traceback</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">Javascript</span> |
| <span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">display</span> |
| <span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">display_javascript</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span> |
| <span class="k">if</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">is_in_notebook</span><span class="p">:</span> |
| <span class="n">script</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">spinner_removal_template</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_id</span><span class="p">)</span> |
| <span class="n">display_javascript</span><span class="p">(</span> |
| <span class="n">Javascript</span><span class="p">(</span> |
| <span class="n">ie</span><span class="o">.</span><span class="n">_JQUERY_WITH_DATATABLE_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">customized_script</span><span class="o">=</span><span class="n">script</span><span class="p">)))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">display</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_exit_text</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">ImportError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">error</span><span class="p">(</span> |
| <span class="s1">'Please use interactive Beam features in an IPython'</span> |
| <span class="s1">'or notebook environment: </span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="n">e</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="progress_indicated"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.progress_indicated">[docs]</a><span class="k">def</span> <span class="nf">progress_indicated</span><span class="p">(</span><span class="n">func</span><span class="p">):</span> |
| <span class="c1"># type: (Callable[..., Any]) -> Callable[..., Any] # noqa: F821</span> |
| |
| <span class="w"> </span><span class="sd">"""A decorator using a unique progress indicator as a context manager to</span> |
| <span class="sd"> execute the given function within."""</span> |
| <span class="nd">@functools</span><span class="o">.</span><span class="n">wraps</span><span class="p">(</span><span class="n">func</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">run_within_progress_indicator</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">with</span> <span class="n">ProgressIndicator</span><span class="p">(</span><span class="sa">f</span><span class="s1">'Processing... </span><span class="si">{</span><span class="n">func</span><span class="o">.</span><span class="vm">__name__</span><span class="si">}</span><span class="s1">'</span><span class="p">,</span> <span class="s1">'Done.'</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">func</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">run_within_progress_indicator</span></div> |
| |
| |
| <div class="viewcode-block" id="as_json"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.as_json">[docs]</a><span class="k">def</span> <span class="nf">as_json</span><span class="p">(</span><span class="n">func</span><span class="p">):</span> |
| <span class="c1"># type: (Callable[..., Any]) -> Callable[..., str] # noqa: F821</span> |
| |
| <span class="w"> </span><span class="sd">"""A decorator convert python objects returned by callables to json</span> |
| <span class="sd"> string.</span> |
| |
| <span class="sd"> The decorated function should always return an object parsable by json.dumps.</span> |
| <span class="sd"> If the object is not parsable, the str() of original object is returned</span> |
| <span class="sd"> instead.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="nf">return_as_json</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">return_value</span> <span class="o">=</span> <span class="n">func</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">return_value</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span> |
| <span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="n">return_value</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">return_as_json</span></div> |
| |
| |
| <div class="viewcode-block" id="deferred_df_to_pcollection"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.deferred_df_to_pcollection">[docs]</a><span class="k">def</span> <span class="nf">deferred_df_to_pcollection</span><span class="p">(</span><span class="n">df</span><span class="p">):</span> |
| <span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">DeferredBase</span><span class="p">),</span> <span class="s1">'</span><span class="si">{}</span><span class="s1"> is not a DeferredBase'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">df</span><span class="p">)</span> |
| |
| <span class="c1"># The proxy is used to output a DataFrame with the correct columns.</span> |
| <span class="c1">#</span> |
| <span class="c1"># TODO(https://github.com/apache/beam/issues/20577): Once type hints are</span> |
| <span class="c1"># implemented for pandas, use those instead of the proxy.</span> |
| <span class="n">cache</span> <span class="o">=</span> <span class="n">ExpressionCache</span><span class="p">()</span> |
| <span class="n">cache</span><span class="o">.</span><span class="n">replace_with_cached</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">_expr</span><span class="p">)</span> |
| |
| <span class="n">proxy</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">_expr</span><span class="o">.</span><span class="n">proxy</span><span class="p">()</span> |
| <span class="k">return</span> <span class="n">to_pcollection</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">yield_elements</span><span class="o">=</span><span class="s1">'pandas'</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="nb">str</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">_expr</span><span class="p">)),</span> <span class="n">proxy</span></div> |
| |
| |
| <div class="viewcode-block" id="pcoll_by_name"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.pcoll_by_name">[docs]</a><span class="k">def</span> <span class="nf">pcoll_by_name</span><span class="p">()</span> <span class="o">-></span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""Finds all PCollections by their variable names defined in the notebook."""</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span> |
| |
| <span class="n">inspectables</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">inspector_with_synthetic</span><span class="o">.</span><span class="n">inspectables</span> |
| <span class="n">pcolls</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">inspectable</span> <span class="ow">in</span> <span class="n">inspectables</span><span class="o">.</span><span class="n">items</span><span class="p">():</span> |
| <span class="n">metadata</span> <span class="o">=</span> <span class="n">inspectable</span><span class="p">[</span><span class="s1">'metadata'</span><span class="p">]</span> |
| <span class="k">if</span> <span class="n">metadata</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'pcollection'</span><span class="p">:</span> |
| <span class="n">pcolls</span><span class="p">[</span><span class="n">metadata</span><span class="p">[</span><span class="s1">'name'</span><span class="p">]]</span> <span class="o">=</span> <span class="n">inspectable</span><span class="p">[</span><span class="s1">'value'</span><span class="p">]</span> |
| <span class="k">return</span> <span class="n">pcolls</span></div> |
| |
| |
| <div class="viewcode-block" id="find_pcoll_name"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.find_pcoll_name">[docs]</a><span class="k">def</span> <span class="nf">find_pcoll_name</span><span class="p">(</span><span class="n">pcoll</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">)</span> <span class="o">-></span> <span class="nb">str</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""Finds the variable name of a PCollection defined by the user.</span> |
| |
| <span class="sd"> Returns None if not assigned to any variable.</span> |
| <span class="sd"> """</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span> |
| |
| <span class="n">inspectables</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">inspector</span><span class="o">.</span><span class="n">inspectables</span> |
| <span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">inspectable</span> <span class="ow">in</span> <span class="n">inspectables</span><span class="o">.</span><span class="n">items</span><span class="p">():</span> |
| <span class="k">if</span> <span class="n">inspectable</span><span class="p">[</span><span class="s1">'value'</span><span class="p">]</span> <span class="ow">is</span> <span class="n">pcoll</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">inspectable</span><span class="p">[</span><span class="s1">'metadata'</span><span class="p">][</span><span class="s1">'name'</span><span class="p">]</span> |
| <span class="k">return</span> <span class="kc">None</span></div> |
| |
| |
| <div class="viewcode-block" id="cacheables"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.cacheables">[docs]</a><span class="k">def</span> <span class="nf">cacheables</span><span class="p">()</span> <span class="o">-></span> <span class="n">Dict</span><span class="p">[</span><span class="n">CacheKey</span><span class="p">,</span> <span class="n">Cacheable</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""Finds all Cacheables with their CacheKeys."""</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span> |
| |
| <span class="n">inspectables</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">inspector_with_synthetic</span><span class="o">.</span><span class="n">inspectables</span> |
| <span class="n">cacheables</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">inspectable</span> <span class="ow">in</span> <span class="n">inspectables</span><span class="o">.</span><span class="n">items</span><span class="p">():</span> |
| <span class="n">metadata</span> <span class="o">=</span> <span class="n">inspectable</span><span class="p">[</span><span class="s1">'metadata'</span><span class="p">]</span> |
| <span class="k">if</span> <span class="n">metadata</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'pcollection'</span><span class="p">:</span> |
| <span class="n">cacheable</span> <span class="o">=</span> <span class="n">Cacheable</span><span class="o">.</span><span class="n">from_pcoll</span><span class="p">(</span><span class="n">metadata</span><span class="p">[</span><span class="s1">'name'</span><span class="p">],</span> <span class="n">inspectable</span><span class="p">[</span><span class="s1">'value'</span><span class="p">])</span> |
| <span class="n">cacheables</span><span class="p">[</span><span class="n">cacheable</span><span class="o">.</span><span class="n">to_key</span><span class="p">()]</span> <span class="o">=</span> <span class="n">cacheable</span> |
| <span class="k">return</span> <span class="n">cacheables</span></div> |
| |
| |
| <div class="viewcode-block" id="watch_sources"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.watch_sources">[docs]</a><span class="k">def</span> <span class="nf">watch_sources</span><span class="p">(</span><span class="n">pipeline</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Watches the unbounded sources in the pipeline.</span> |
| |
| <span class="sd"> Sources can output to a PCollection without a user variable reference. In</span> |
| <span class="sd"> this case the source is not cached. We still want to cache the data so we</span> |
| <span class="sd"> synthetically create a variable to the intermediate PCollection.</span> |
| <span class="sd"> """</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.pipeline</span> <span class="kn">import</span> <span class="n">PipelineVisitor</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span> |
| |
| <span class="n">retrieved_user_pipeline</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">user_pipeline</span><span class="p">(</span><span class="n">pipeline</span><span class="p">)</span> |
| <span class="n">pcoll_to_name</span> <span class="o">=</span> <span class="p">{</span><span class="n">v</span><span class="p">:</span> <span class="n">k</span> <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">pcoll_by_name</span><span class="p">()</span><span class="o">.</span><span class="n">items</span><span class="p">()}</span> |
| |
| <span class="k">class</span> <span class="nc">CacheableUnboundedPCollectionVisitor</span><span class="p">(</span><span class="n">PipelineVisitor</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">unbounded_pcolls</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">enter_composite_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">visit_transform</span><span class="p">(</span><span class="n">transform_node</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">visit_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">transform_node</span><span class="o">.</span><span class="n">transform</span><span class="p">,</span> |
| <span class="nb">tuple</span><span class="p">(</span><span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">recordable_sources</span><span class="p">)):</span> |
| <span class="k">for</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">transform_node</span><span class="o">.</span><span class="n">outputs</span><span class="o">.</span><span class="n">values</span><span class="p">():</span> |
| <span class="c1"># Only generate a synthetic var when it's not already watched. For</span> |
| <span class="c1"># example, the user could have assigned the unbounded source output</span> |
| <span class="c1"># to a variable, watching it again with a different variable name</span> |
| <span class="c1"># creates ambiguity.</span> |
| <span class="k">if</span> <span class="n">pcoll</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">pcoll_to_name</span><span class="p">:</span> |
| <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">watch</span><span class="p">({</span><span class="s1">'synthetic_var_'</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="nb">id</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)):</span> <span class="n">pcoll</span><span class="p">})</span> |
| |
| <span class="n">retrieved_user_pipeline</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">CacheableUnboundedPCollectionVisitor</span><span class="p">())</span></div> |
| |
| |
| <div class="viewcode-block" id="has_unbounded_sources"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.has_unbounded_sources">[docs]</a><span class="k">def</span> <span class="nf">has_unbounded_sources</span><span class="p">(</span><span class="n">pipeline</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Checks if a given pipeline has recordable sources."""</span> |
| <span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="n">unbounded_sources</span><span class="p">(</span><span class="n">pipeline</span><span class="p">))</span> <span class="o">></span> <span class="mi">0</span></div> |
| |
| |
| <div class="viewcode-block" id="unbounded_sources"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.unbounded_sources">[docs]</a><span class="k">def</span> <span class="nf">unbounded_sources</span><span class="p">(</span><span class="n">pipeline</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Returns a pipeline's recordable sources."""</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.pipeline</span> <span class="kn">import</span> <span class="n">PipelineVisitor</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span> |
| |
| <span class="k">class</span> <span class="nc">CheckUnboundednessVisitor</span><span class="p">(</span><span class="n">PipelineVisitor</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Visitor checks if there are any unbounded read sources in the Pipeline.</span> |
| |
| <span class="sd"> Visitor visits all nodes and checks if it is an instance of recordable</span> |
| <span class="sd"> sources.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">unbounded_sources</span> <span class="o">=</span> <span class="p">[]</span> |
| |
| <span class="k">def</span> <span class="nf">enter_composite_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">visit_transform</span><span class="p">(</span><span class="n">transform_node</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">visit_transform</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">transform_node</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">transform_node</span><span class="o">.</span><span class="n">transform</span><span class="p">,</span> |
| <span class="nb">tuple</span><span class="p">(</span><span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">recordable_sources</span><span class="p">)):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">unbounded_sources</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">transform_node</span><span class="p">)</span> |
| |
| <span class="n">v</span> <span class="o">=</span> <span class="n">CheckUnboundednessVisitor</span><span class="p">()</span> |
| <span class="n">pipeline</span><span class="o">.</span><span class="n">visit</span><span class="p">(</span><span class="n">v</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">v</span><span class="o">.</span><span class="n">unbounded_sources</span></div> |
| |
| |
| <div class="viewcode-block" id="create_var_in_main"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.create_var_in_main">[docs]</a><span class="k">def</span> <span class="nf">create_var_in_main</span><span class="p">(</span><span class="n">name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">value</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> |
| <span class="n">watch</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">True</span><span class="p">)</span> <span class="o">-></span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""Declares a variable in the main module.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> name: the variable name in the main module.</span> |
| <span class="sd"> value: the value of the variable.</span> |
| <span class="sd"> watch: whether to watch it in the interactive environment.</span> |
| <span class="sd"> Returns:</span> |
| <span class="sd"> A 2-entry tuple of the variable name and value.</span> |
| <span class="sd"> """</span> |
| <span class="nb">setattr</span><span class="p">(</span><span class="n">importlib</span><span class="o">.</span><span class="n">import_module</span><span class="p">(</span><span class="s1">'__main__'</span><span class="p">),</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">watch</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span> |
| <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">watch</span><span class="p">({</span><span class="n">name</span><span class="p">:</span> <span class="n">value</span><span class="p">})</span> |
| <span class="k">return</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span></div> |
| |
| |
| <div class="viewcode-block" id="assert_bucket_exists"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.assert_bucket_exists">[docs]</a><span class="k">def</span> <span class="nf">assert_bucket_exists</span><span class="p">(</span><span class="n">bucket_name</span><span class="p">):</span> |
| <span class="c1"># type: (str) -> None</span> |
| |
| <span class="w"> </span><span class="sd">"""Asserts whether the specified GCS bucket with the name</span> |
| <span class="sd"> bucket_name exists.</span> |
| |
| <span class="sd"> Logs an error and raises a ValueError if the bucket does not exist.</span> |
| |
| <span class="sd"> Logs a warning if the bucket cannot be verified to exist.</span> |
| <span class="sd"> """</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">apitools.base.py.exceptions</span> <span class="kn">import</span> <span class="n">HttpError</span> |
| <span class="n">storage_client</span> <span class="o">=</span> <span class="n">storage</span><span class="o">.</span><span class="n">StorageV1</span><span class="p">(</span> |
| <span class="n">credentials</span><span class="o">=</span><span class="n">auth</span><span class="o">.</span><span class="n">get_service_credentials</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">()),</span> |
| <span class="n">get_credentials</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">http</span><span class="o">=</span><span class="n">get_new_http</span><span class="p">(),</span> |
| <span class="n">response_encoding</span><span class="o">=</span><span class="s1">'utf8'</span><span class="p">)</span> |
| <span class="n">request</span> <span class="o">=</span> <span class="n">storage</span><span class="o">.</span><span class="n">StorageBucketsGetRequest</span><span class="p">(</span><span class="n">bucket</span><span class="o">=</span><span class="n">bucket_name</span><span class="p">)</span> |
| <span class="n">storage_client</span><span class="o">.</span><span class="n">buckets</span><span class="o">.</span><span class="n">Get</span><span class="p">(</span><span class="n">request</span><span class="p">)</span> |
| <span class="k">except</span> <span class="n">HttpError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">e</span><span class="o">.</span><span class="n">status_code</span> <span class="o">==</span> <span class="mi">404</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s1">'</span><span class="si">%s</span><span class="s1"> bucket does not exist!'</span><span class="p">,</span> <span class="n">bucket_name</span><span class="p">)</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'Invalid GCS bucket provided!'</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span> |
| <span class="s1">'HttpError - unable to verify whether bucket </span><span class="si">%s</span><span class="s1"> exists'</span><span class="p">,</span> <span class="n">bucket_name</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span> |
| <span class="s1">'ImportError - unable to verify whether bucket </span><span class="si">%s</span><span class="s1"> exists'</span><span class="p">,</span> <span class="n">bucket_name</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="detect_pipeline_runner"><a class="viewcode-back" href="../../../../apache_beam.runners.interactive.utils.html#apache_beam.runners.interactive.messaging.interactive_environment_inspector.detect_pipeline_runner">[docs]</a><span class="k">def</span> <span class="nf">detect_pipeline_runner</span><span class="p">(</span><span class="n">pipeline</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pipeline</span><span class="p">,</span> <span class="n">Pipeline</span><span class="p">):</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.interactive_runner</span> <span class="kn">import</span> <span class="n">InteractiveRunner</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pipeline</span><span class="o">.</span><span class="n">runner</span><span class="p">,</span> <span class="n">InteractiveRunner</span><span class="p">):</span> |
| <span class="n">pipeline_runner</span> <span class="o">=</span> <span class="n">pipeline</span><span class="o">.</span><span class="n">runner</span><span class="o">.</span><span class="n">_underlying_runner</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">pipeline_runner</span> <span class="o">=</span> <span class="n">pipeline</span><span class="o">.</span><span class="n">runner</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">pipeline_runner</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">return</span> <span class="n">pipeline_runner</span></div> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| © Copyright |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |