blob: d896cbbb1eb77afc9b4903a959bdc67655fdde75 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.runners.interactive.display.pcoll_visualization &mdash; Apache Beam 2.38.0 documentation</title>
<script type="text/javascript" src="../../../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../../../" src="../../../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../../../genindex.html" />
<link rel="search" title="Search" href="../../../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.38.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.utils.html">apache_beam.utils package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.runners.interactive.display.pcoll_visualization</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.runners.interactive.display.pcoll_visualization</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Module visualizes PCollection data.</span>
<span class="sd">For internal use only; no backwards-compatibility guarantees.</span>
<span class="sd">Only works with Python 3.5+.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="c1"># pytype: skip-file</span>
<span class="kn">import</span> <span class="nn">base64</span>
<span class="kn">import</span> <span class="nn">datetime</span>
<span class="kn">import</span> <span class="nn">html</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">timedelta</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Optional</span>
<span class="kn">from</span> <span class="nn">dateutil</span> <span class="kn">import</span> <span class="n">tz</span>
<span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.utils</span> <span class="kn">import</span> <span class="n">elements_to_df</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">GlobalWindow</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">IntervalWindow</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">IPython</span> <span class="kn">import</span> <span class="n">get_ipython</span> <span class="c1"># pylint: disable=import-error</span>
<span class="kn">from</span> <span class="nn">IPython.core.display</span> <span class="kn">import</span> <span class="n">HTML</span> <span class="c1"># pylint: disable=import-error</span>
<span class="kn">from</span> <span class="nn">IPython.core.display</span> <span class="kn">import</span> <span class="n">Javascript</span> <span class="c1"># pylint: disable=import-error</span>
<span class="kn">from</span> <span class="nn">IPython.core.display</span> <span class="kn">import</span> <span class="n">display</span> <span class="c1"># pylint: disable=import-error</span>
<span class="kn">from</span> <span class="nn">IPython.core.display</span> <span class="kn">import</span> <span class="n">display_javascript</span> <span class="c1"># pylint: disable=import-error</span>
<span class="kn">from</span> <span class="nn">facets_overview.generic_feature_statistics_generator</span> <span class="kn">import</span> <span class="n">GenericFeatureStatisticsGenerator</span> <span class="c1"># pylint: disable=import-error</span>
<span class="kn">from</span> <span class="nn">timeloop</span> <span class="kn">import</span> <span class="n">Timeloop</span> <span class="c1"># pylint: disable=import-error</span>
<span class="k">if</span> <span class="n">get_ipython</span><span class="p">():</span>
<span class="n">_pcoll_visualization_ready</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">_pcoll_visualization_ready</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="n">_pcoll_visualization_ready</span> <span class="o">=</span> <span class="kc">False</span>
<span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>
<span class="n">_CSS</span> <span class="o">=</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> &lt;style&gt;</span>
<span class="s2"> .p-Widget.jp-OutputPrompt.jp-OutputArea-prompt:empty {{</span>
<span class="s2"> padding: 0;</span>
<span class="s2"> border: 0;</span>
<span class="s2"> }}</span>
<span class="s2"> .p-Widget.jp-RenderedJavaScript.jp-mod-trusted.jp-OutputArea-output:empty {{</span>
<span class="s2"> padding: 0;</span>
<span class="s2"> border: 0;</span>
<span class="s2"> }}</span>
<span class="s2"> &lt;/style&gt;&quot;&quot;&quot;</span>
<span class="n">_DIVE_SCRIPT_TEMPLATE</span> <span class="o">=</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> try {{</span>
<span class="s2"> document</span>
<span class="s2"> .getElementById(&quot;</span><span class="si">{display_id}</span><span class="s2">&quot;)</span>
<span class="s2"> .contentDocument</span>
<span class="s2"> .getElementById(&quot;</span><span class="si">{display_id}</span><span class="s2">&quot;)</span>
<span class="s2"> .data = </span><span class="si">{jsonstr}</span><span class="s2">;</span>
<span class="s2"> }} catch (e) {{</span>
<span class="s2"> // NOOP when the user has cleared the output from the notebook.</span>
<span class="s2"> }}&quot;&quot;&quot;</span>
<span class="n">_DIVE_HTML_TEMPLATE</span> <span class="o">=</span> <span class="n">_CSS</span> <span class="o">+</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> &lt;iframe id=</span><span class="si">{display_id}</span><span class="s2"> style=&quot;border:none&quot; width=&quot;100%&quot; height=&quot;600px&quot;</span>
<span class="s2"> srcdoc=&#39;</span>
<span class="s2"> &lt;script src=&quot;https://cdnjs.cloudflare.com/ajax/libs/webcomponentsjs/1.3.3/webcomponents-lite.js&quot;&gt;&lt;/script&gt;</span>
<span class="s2"> &lt;link rel=&quot;import&quot; href=&quot;https://raw.githubusercontent.com/PAIR-code/facets/1.0.0/facets-dist/facets-jupyter.html&quot;&gt;</span>
<span class="s2"> &lt;facets-dive sprite-image-width=&quot;</span><span class="si">{sprite_size}</span><span class="s2">&quot; sprite-image-height=&quot;</span><span class="si">{sprite_size}</span><span class="s2">&quot; id=&quot;</span><span class="si">{display_id}</span><span class="s2">&quot; height=&quot;600&quot;&gt;&lt;/facets-dive&gt;</span>
<span class="s2"> &lt;script&gt;</span>
<span class="s2"> document.getElementById(&quot;</span><span class="si">{display_id}</span><span class="s2">&quot;).data = </span><span class="si">{jsonstr}</span><span class="s2">;</span>
<span class="s2"> &lt;/script&gt;</span>
<span class="s2"> &#39;&gt;</span>
<span class="s2"> &lt;/iframe&gt;&quot;&quot;&quot;</span>
<span class="n">_OVERVIEW_SCRIPT_TEMPLATE</span> <span class="o">=</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> try {{</span>
<span class="s2"> document</span>
<span class="s2"> .getElementById(&quot;</span><span class="si">{display_id}</span><span class="s2">&quot;)</span>
<span class="s2"> .contentDocument</span>
<span class="s2"> .getElementById(&quot;</span><span class="si">{display_id}</span><span class="s2">&quot;)</span>
<span class="s2"> .protoInput = &quot;</span><span class="si">{protostr}</span><span class="s2">&quot;;</span>
<span class="s2"> }} catch (e) {{</span>
<span class="s2"> // NOOP when the user has cleared the output from the notebook.</span>
<span class="s2"> }}&quot;&quot;&quot;</span>
<span class="n">_OVERVIEW_HTML_TEMPLATE</span> <span class="o">=</span> <span class="n">_CSS</span> <span class="o">+</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> &lt;iframe id=</span><span class="si">{display_id}</span><span class="s2"> style=&quot;border:none&quot; width=&quot;100%&quot; height=&quot;600px&quot;</span>
<span class="s2"> srcdoc=&#39;</span>
<span class="s2"> &lt;script src=&quot;https://cdnjs.cloudflare.com/ajax/libs/webcomponentsjs/1.3.3/webcomponents-lite.js&quot;&gt;&lt;/script&gt;</span>
<span class="s2"> &lt;link rel=&quot;import&quot; href=&quot;https://raw.githubusercontent.com/PAIR-code/facets/1.0.0/facets-dist/facets-jupyter.html&quot;&gt;</span>
<span class="s2"> &lt;facets-overview id=&quot;</span><span class="si">{display_id}</span><span class="s2">&quot;&gt;&lt;/facets-overview&gt;</span>
<span class="s2"> &lt;script&gt;</span>
<span class="s2"> document.getElementById(&quot;</span><span class="si">{display_id}</span><span class="s2">&quot;).protoInput = &quot;</span><span class="si">{protostr}</span><span class="s2">&quot;;</span>
<span class="s2"> &lt;/script&gt;</span>
<span class="s2"> &#39;&gt;</span>
<span class="s2"> &lt;/iframe&gt;&quot;&quot;&quot;</span>
<span class="n">_DATATABLE_INITIALIZATION_CONFIG</span> <span class="o">=</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> bAutoWidth: false,</span>
<span class="s2"> columns: </span><span class="si">{columns}</span><span class="s2">,</span>
<span class="s2"> destroy: true,</span>
<span class="s2"> responsive: true,</span>
<span class="s2"> columnDefs: [</span>
<span class="s2"> {{</span>
<span class="s2"> targets: &quot;_all&quot;,</span>
<span class="s2"> className: &quot;dt-left&quot;</span>
<span class="s2"> }},</span>
<span class="s2"> {{</span>
<span class="s2"> &quot;targets&quot;: 0,</span>
<span class="s2"> &quot;width&quot;: &quot;10px&quot;,</span>
<span class="s2"> &quot;title&quot;: &quot;&quot;</span>
<span class="s2"> }}</span>
<span class="s2"> ]&quot;&quot;&quot;</span>
<span class="n">_DATAFRAME_SCRIPT_TEMPLATE</span> <span class="o">=</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> var dt;</span>
<span class="s2"> if ($.fn.dataTable.isDataTable(&quot;#</span><span class="si">{table_id}</span><span class="s2">&quot;)) {{</span>
<span class="s2"> dt = $(&quot;#</span><span class="si">{table_id}</span><span class="s2">&quot;).dataTable();</span>
<span class="s2"> }} else if ($(&quot;#</span><span class="si">{table_id}</span><span class="s2">_wrapper&quot;).length == 0) {{</span>
<span class="s2"> dt = $(&quot;#</span><span class="si">{table_id}</span><span class="s2">&quot;).dataTable({{</span>
<span class="s2"> &quot;&quot;&quot;</span> <span class="o">+</span> <span class="n">_DATATABLE_INITIALIZATION_CONFIG</span> <span class="o">+</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> }});</span>
<span class="s2"> }} else {{</span>
<span class="s2"> return;</span>
<span class="s2"> }}</span>
<span class="s2"> dt.api()</span>
<span class="s2"> .clear()</span>
<span class="s2"> .rows.add(</span><span class="si">{data_as_rows}</span><span class="s2">)</span>
<span class="s2"> .draw(&#39;full-hold&#39;);&quot;&quot;&quot;</span>
<span class="n">_DATAFRAME_PAGINATION_TEMPLATE</span> <span class="o">=</span> <span class="n">_CSS</span> <span class="o">+</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> &lt;link rel=&quot;stylesheet&quot; href=&quot;https://cdn.datatables.net/1.10.20/css/jquery.dataTables.min.css&quot;&gt;</span>
<span class="s2"> &lt;table id=&quot;</span><span class="si">{table_id}</span><span class="s2">&quot; class=&quot;display&quot; style=&quot;display:block&quot;&gt;&lt;/table&gt;</span>
<span class="s2"> &lt;script&gt;</span>
<span class="s2"> </span><span class="si">{script_in_jquery_with_datatable}</span><span class="s2"></span>
<span class="s2"> &lt;/script&gt;&quot;&quot;&quot;</span>
<span class="n">_NO_DATA_TEMPLATE</span> <span class="o">=</span> <span class="n">_CSS</span> <span class="o">+</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> &lt;div id=&quot;no_data_</span><span class="si">{id}</span><span class="s2">&quot;&gt;No data to display.&lt;/div&gt;&quot;&quot;&quot;</span>
<span class="n">_NO_DATA_REMOVAL_SCRIPT</span> <span class="o">=</span> <span class="s2">&quot;&quot;&quot;</span>
<span class="s2"> $(&quot;#no_data_</span><span class="si">{id}</span><span class="s2">&quot;).remove();&quot;&quot;&quot;</span>
<div class="viewcode-block" id="visualize"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.visualize">[docs]</a><span class="k">def</span> <span class="nf">visualize</span><span class="p">(</span>
<span class="n">stream</span><span class="p">,</span>
<span class="n">dynamic_plotting_interval</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">include_window_info</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">display_facets</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">element_type</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Visualizes the data of a given PCollection. Optionally enables dynamic</span>
<span class="sd"> plotting with interval in seconds if the PCollection is being produced by a</span>
<span class="sd"> running pipeline or the pipeline is streaming indefinitely. The function</span>
<span class="sd"> always returns immediately and is asynchronous when dynamic plotting is on.</span>
<span class="sd"> If dynamic plotting enabled, the visualization is updated continuously until</span>
<span class="sd"> the pipeline producing the PCollection is in an end state. The visualization</span>
<span class="sd"> would be anchored to the notebook cell output area. The function</span>
<span class="sd"> asynchronously returns a handle to the visualization job immediately. The user</span>
<span class="sd"> could manually do::</span>
<span class="sd"> # In one notebook cell, enable dynamic plotting every 1 second:</span>
<span class="sd"> handle = visualize(pcoll, dynamic_plotting_interval=1)</span>
<span class="sd"> # Visualization anchored to the cell&#39;s output area.</span>
<span class="sd"> # In a different cell:</span>
<span class="sd"> handle.stop()</span>
<span class="sd"> # Will stop the dynamic plotting of the above visualization manually.</span>
<span class="sd"> # Otherwise, dynamic plotting ends when pipeline is not running anymore.</span>
<span class="sd"> If dynamic_plotting is not enabled (by default), None is returned.</span>
<span class="sd"> If include_window_info is True, the data will include window information,</span>
<span class="sd"> which consists of the event timestamps, windows, and pane info.</span>
<span class="sd"> If display_facets is True, the facets widgets will be rendered. Otherwise, the</span>
<span class="sd"> facets widgets will not be rendered.</span>
<span class="sd"> The function is experimental. For internal use only; no</span>
<span class="sd"> backwards-compatibility guarantees.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">_pcoll_visualization_ready</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">None</span>
<span class="n">pv</span> <span class="o">=</span> <span class="n">PCollectionVisualization</span><span class="p">(</span>
<span class="n">stream</span><span class="p">,</span>
<span class="n">include_window_info</span><span class="o">=</span><span class="n">include_window_info</span><span class="p">,</span>
<span class="n">display_facets</span><span class="o">=</span><span class="n">display_facets</span><span class="p">,</span>
<span class="n">element_type</span><span class="o">=</span><span class="n">element_type</span><span class="p">)</span>
<span class="k">if</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">is_in_notebook</span><span class="p">:</span>
<span class="n">pv</span><span class="o">.</span><span class="n">display</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">pv</span><span class="o">.</span><span class="n">display_plain_text</span><span class="p">()</span>
<span class="c1"># We don&#39;t want to do dynamic plotting if there is no notebook frontend.</span>
<span class="k">return</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">dynamic_plotting_interval</span><span class="p">:</span>
<span class="c1"># Disables the verbose logging from timeloop.</span>
<span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s1">&#39;timeloop&#39;</span><span class="p">)</span><span class="o">.</span><span class="n">disabled</span> <span class="o">=</span> <span class="kc">True</span>
<span class="n">tl</span> <span class="o">=</span> <span class="n">Timeloop</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">dynamic_plotting</span><span class="p">(</span><span class="n">stream</span><span class="p">,</span> <span class="n">pv</span><span class="p">,</span> <span class="n">tl</span><span class="p">,</span> <span class="n">include_window_info</span><span class="p">,</span> <span class="n">display_facets</span><span class="p">):</span>
<span class="nd">@tl</span><span class="o">.</span><span class="n">job</span><span class="p">(</span><span class="n">interval</span><span class="o">=</span><span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">dynamic_plotting_interval</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">continuous_update_display</span><span class="p">():</span> <span class="c1"># pylint: disable=unused-variable</span>
<span class="c1"># Always creates a new PCollVisualization instance when the</span>
<span class="c1"># PCollection materialization is being updated and dynamic</span>
<span class="c1"># plotting is in-process.</span>
<span class="c1"># PCollectionVisualization created at this level doesn&#39;t need dynamic</span>
<span class="c1"># plotting interval information when instantiated because it&#39;s already</span>
<span class="c1"># in dynamic plotting logic.</span>
<span class="n">updated_pv</span> <span class="o">=</span> <span class="n">PCollectionVisualization</span><span class="p">(</span>
<span class="n">stream</span><span class="p">,</span>
<span class="n">include_window_info</span><span class="o">=</span><span class="n">include_window_info</span><span class="p">,</span>
<span class="n">display_facets</span><span class="o">=</span><span class="n">display_facets</span><span class="p">,</span>
<span class="n">element_type</span><span class="o">=</span><span class="n">element_type</span><span class="p">)</span>
<span class="n">updated_pv</span><span class="o">.</span><span class="n">display</span><span class="p">(</span><span class="n">updating_pv</span><span class="o">=</span><span class="n">pv</span><span class="p">)</span>
<span class="c1"># Stop updating the visualizations as soon as the stream will not yield</span>
<span class="c1"># new elements.</span>
<span class="k">if</span> <span class="n">stream</span><span class="o">.</span><span class="n">is_done</span><span class="p">():</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">tl</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">except</span> <span class="ne">RuntimeError</span><span class="p">:</span>
<span class="c1"># The job can only be stopped once. Ignore excessive stops.</span>
<span class="k">pass</span>
<span class="n">tl</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
<span class="k">return</span> <span class="n">tl</span>
<span class="k">return</span> <span class="n">dynamic_plotting</span><span class="p">(</span><span class="n">stream</span><span class="p">,</span> <span class="n">pv</span><span class="p">,</span> <span class="n">tl</span><span class="p">,</span> <span class="n">include_window_info</span><span class="p">,</span> <span class="n">display_facets</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">None</span></div>
<div class="viewcode-block" id="visualize_computed_pcoll"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.visualize_computed_pcoll">[docs]</a><span class="k">def</span> <span class="nf">visualize_computed_pcoll</span><span class="p">(</span>
<span class="n">pcoll_name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">pcoll</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">,</span>
<span class="n">max_n</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">max_duration_secs</span><span class="p">:</span> <span class="nb">float</span><span class="p">,</span>
<span class="n">dynamic_plotting_interval</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">include_window_info</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="n">display_facets</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="sd">&quot;&quot;&quot;A simple visualize alternative.</span>
<span class="sd"> When the pcoll_name and pcoll pair identifies a watched and computed</span>
<span class="sd"> PCollection in the current interactive environment without ambiguity, an</span>
<span class="sd"> ElementStream can be built directly from cache. Returns immediately, the</span>
<span class="sd"> visualization is asynchronous, but guaranteed to end in the near future.</span>
<span class="sd"> Args:</span>
<span class="sd"> pcoll_name: the variable name of the PCollection.</span>
<span class="sd"> pcoll: the PCollection to be visualized.</span>
<span class="sd"> max_n: the maximum number of elements to visualize.</span>
<span class="sd"> max_duration_secs: max duration of elements to read in seconds.</span>
<span class="sd"> dynamic_plotting_interval: the interval in seconds between visualization</span>
<span class="sd"> updates if provided; otherwise, no dynamic plotting.</span>
<span class="sd"> include_window_info: whether to include windowing info in the elements.</span>
<span class="sd"> display_facets: whether to display the facets widgets.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">pipeline</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">user_pipeline</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span>
<span class="n">rm</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">get_recording_manager</span><span class="p">(</span><span class="n">pipeline</span><span class="p">,</span> <span class="n">create_if_absent</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="n">stream</span> <span class="o">=</span> <span class="n">rm</span><span class="o">.</span><span class="n">read</span><span class="p">(</span>
<span class="n">pcoll_name</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">,</span> <span class="n">max_n</span><span class="o">=</span><span class="n">max_n</span><span class="p">,</span> <span class="n">max_duration_secs</span><span class="o">=</span><span class="n">max_duration_secs</span><span class="p">)</span>
<span class="k">if</span> <span class="n">stream</span><span class="p">:</span>
<span class="n">visualize</span><span class="p">(</span>
<span class="n">stream</span><span class="p">,</span>
<span class="n">dynamic_plotting_interval</span><span class="o">=</span><span class="n">dynamic_plotting_interval</span><span class="p">,</span>
<span class="n">include_window_info</span><span class="o">=</span><span class="n">include_window_info</span><span class="p">,</span>
<span class="n">display_facets</span><span class="o">=</span><span class="n">display_facets</span><span class="p">,</span>
<span class="n">element_type</span><span class="o">=</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">)</span></div>
<div class="viewcode-block" id="PCollectionVisualization"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.PCollectionVisualization">[docs]</a><span class="k">class</span> <span class="nc">PCollectionVisualization</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A visualization of a PCollection.</span>
<span class="sd"> The class relies on creating a PipelineInstrument w/o actual instrument to</span>
<span class="sd"> access current interactive environment for materialized PCollection data at</span>
<span class="sd"> the moment of self instantiation through cache.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">stream</span><span class="p">,</span>
<span class="n">include_window_info</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">display_facets</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">element_type</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">assert</span> <span class="n">_pcoll_visualization_ready</span><span class="p">,</span> <span class="p">(</span>
<span class="s1">&#39;Dependencies for PCollection visualization are not available. Please &#39;</span>
<span class="s1">&#39;use `pip install apache-beam[interactive]` to install necessary &#39;</span>
<span class="s1">&#39;dependencies and make sure that you are executing code in an &#39;</span>
<span class="s1">&#39;interactive environment such as a Jupyter notebook.&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_stream</span> <span class="o">=</span> <span class="n">stream</span>
<span class="c1"># Variable name as the title for element value in the rendered data table.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pcoll_var</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="n">var</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pcoll_var</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pcoll_var</span> <span class="o">=</span> <span class="s1">&#39;Value&#39;</span>
<span class="n">obfuscated_id</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="n">display_id</span><span class="p">(</span><span class="nb">id</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_dive_display_id</span> <span class="o">=</span> <span class="s1">&#39;facets_dive_</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">obfuscated_id</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_overview_display_id</span> <span class="o">=</span> <span class="s1">&#39;facets_overview_</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">obfuscated_id</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_df_display_id</span> <span class="o">=</span> <span class="s1">&#39;df_</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">obfuscated_id</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_include_window_info</span> <span class="o">=</span> <span class="n">include_window_info</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_display_facets</span> <span class="o">=</span> <span class="n">display_facets</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_is_datatable_empty</span> <span class="o">=</span> <span class="kc">True</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_element_type</span> <span class="o">=</span> <span class="n">element_type</span>
<div class="viewcode-block" id="PCollectionVisualization.display_plain_text"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.PCollectionVisualization.display_plain_text">[docs]</a> <span class="k">def</span> <span class="nf">display_plain_text</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Displays a head sample of the normalized PCollection data.</span>
<span class="sd"> This function is used when the ipython kernel is not connected to a</span>
<span class="sd"> notebook frontend such as when running ipython in terminal or in unit tests.</span>
<span class="sd"> It&#39;s a visualization in terminal-like UI, not a function to retrieve data</span>
<span class="sd"> for programmatically usages.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Double check if the dependency is ready in case someone mistakenly uses</span>
<span class="c1"># the function.</span>
<span class="k">if</span> <span class="n">_pcoll_visualization_ready</span><span class="p">:</span>
<span class="n">data</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_to_dataframe</span><span class="p">()</span>
<span class="c1"># Displays a data-table with at most 25 entries from the head.</span>
<span class="n">data_sample</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">head</span><span class="p">(</span><span class="mi">25</span><span class="p">)</span>
<span class="n">display</span><span class="p">(</span><span class="n">data_sample</span><span class="p">)</span></div>
<div class="viewcode-block" id="PCollectionVisualization.display"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.PCollectionVisualization.display">[docs]</a> <span class="k">def</span> <span class="nf">display</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">updating_pv</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Displays the visualization through IPython.</span>
<span class="sd"> Args:</span>
<span class="sd"> updating_pv: A PCollectionVisualization object. When provided, the</span>
<span class="sd"> display_id of each visualization part will inherit from the initial</span>
<span class="sd"> display of updating_pv and only update that visualization web element</span>
<span class="sd"> instead of creating new ones.</span>
<span class="sd"> The visualization has 3 parts: facets-dive, facets-overview and paginated</span>
<span class="sd"> data table. Each part is assigned an auto-generated unique display id</span>
<span class="sd"> (the uniqueness is guaranteed throughout the lifespan of the PCollection</span>
<span class="sd"> variable).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Ensures that dive, overview and table render the same data because the</span>
<span class="c1"># materialized PCollection data might being updated continuously.</span>
<span class="n">data</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_to_dataframe</span><span class="p">()</span>
<span class="c1"># Give the numbered column names when visualizing.</span>
<span class="n">data</span><span class="o">.</span><span class="n">columns</span> <span class="o">=</span> <span class="p">[</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pcoll_var</span> <span class="o">+</span> <span class="s1">&#39;.&#39;</span> <span class="o">+</span>
<span class="nb">str</span><span class="p">(</span><span class="n">column</span><span class="p">)</span> <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">column</span><span class="p">,</span> <span class="nb">int</span><span class="p">)</span> <span class="k">else</span> <span class="n">column</span>
<span class="k">for</span> <span class="n">column</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span>
<span class="p">]</span>
<span class="c1"># String-ify the dictionaries for display because elements of type dict</span>
<span class="c1"># cannot be ordered.</span>
<span class="n">data</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">applymap</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">x</span><span class="p">)</span> <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="nb">dict</span><span class="p">)</span> <span class="k">else</span> <span class="n">x</span><span class="p">)</span>
<span class="k">if</span> <span class="n">updating_pv</span><span class="p">:</span>
<span class="c1"># Only updates when data is not empty. Otherwise, consider it a bad</span>
<span class="c1"># iteration and noop since there is nothing to be updated.</span>
<span class="k">if</span> <span class="n">data</span><span class="o">.</span><span class="n">empty</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;Skip a visualization update due to empty data.&#39;</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_display_dataframe</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">deep</span><span class="o">=</span><span class="kc">True</span><span class="p">),</span> <span class="n">updating_pv</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_display_facets</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_display_dive</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">deep</span><span class="o">=</span><span class="kc">True</span><span class="p">),</span> <span class="n">updating_pv</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_display_overview</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">deep</span><span class="o">=</span><span class="kc">True</span><span class="p">),</span> <span class="n">updating_pv</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_display_dataframe</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">deep</span><span class="o">=</span><span class="kc">True</span><span class="p">))</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_display_facets</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_display_dive</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">deep</span><span class="o">=</span><span class="kc">True</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_display_overview</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">deep</span><span class="o">=</span><span class="kc">True</span><span class="p">))</span></div>
<span class="k">def</span> <span class="nf">_display_dive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">data</span><span class="p">,</span> <span class="n">update</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="n">sprite_size</span> <span class="o">=</span> <span class="mi">32</span> <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">index</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">50000</span> <span class="k">else</span> <span class="mi">64</span>
<span class="n">format_window_info_in_dataframe</span><span class="p">(</span><span class="n">data</span><span class="p">)</span>
<span class="n">jsonstr</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">to_json</span><span class="p">(</span><span class="n">orient</span><span class="o">=</span><span class="s1">&#39;records&#39;</span><span class="p">,</span> <span class="n">default_handler</span><span class="o">=</span><span class="nb">str</span><span class="p">)</span>
<span class="k">if</span> <span class="n">update</span><span class="p">:</span>
<span class="n">script</span> <span class="o">=</span> <span class="n">_DIVE_SCRIPT_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">display_id</span><span class="o">=</span><span class="n">update</span><span class="o">.</span><span class="n">_dive_display_id</span><span class="p">,</span> <span class="n">jsonstr</span><span class="o">=</span><span class="n">jsonstr</span><span class="p">)</span>
<span class="n">display_javascript</span><span class="p">(</span><span class="n">Javascript</span><span class="p">(</span><span class="n">script</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">html_str</span> <span class="o">=</span> <span class="n">_DIVE_HTML_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">display_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_dive_display_id</span><span class="p">,</span>
<span class="n">jsonstr</span><span class="o">=</span><span class="n">html</span><span class="o">.</span><span class="n">escape</span><span class="p">(</span><span class="n">jsonstr</span><span class="p">),</span>
<span class="n">sprite_size</span><span class="o">=</span><span class="n">sprite_size</span><span class="p">)</span>
<span class="n">display</span><span class="p">(</span><span class="n">HTML</span><span class="p">(</span><span class="n">html_str</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">_display_overview</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">data</span><span class="p">,</span> <span class="n">update</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="p">(</span><span class="ow">not</span> <span class="n">data</span><span class="o">.</span><span class="n">empty</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">_include_window_info</span> <span class="ow">and</span>
<span class="nb">all</span><span class="p">(</span><span class="n">column</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span>
<span class="k">for</span> <span class="n">column</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">&#39;event_time&#39;</span><span class="p">,</span> <span class="s1">&#39;windows&#39;</span><span class="p">,</span> <span class="s1">&#39;pane_info&#39;</span><span class="p">))):</span>
<span class="n">data</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">drop</span><span class="p">([</span><span class="s1">&#39;event_time&#39;</span><span class="p">,</span> <span class="s1">&#39;windows&#39;</span><span class="p">,</span> <span class="s1">&#39;pane_info&#39;</span><span class="p">],</span> <span class="n">axis</span><span class="o">=</span><span class="mi">1</span><span class="p">)</span>
<span class="c1"># GFSG expects all column names to be strings.</span>
<span class="n">data</span><span class="o">.</span><span class="n">columns</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span><span class="o">.</span><span class="n">astype</span><span class="p">(</span><span class="nb">str</span><span class="p">)</span>
<span class="n">gfsg</span> <span class="o">=</span> <span class="n">GenericFeatureStatisticsGenerator</span><span class="p">()</span>
<span class="n">proto</span> <span class="o">=</span> <span class="n">gfsg</span><span class="o">.</span><span class="n">ProtoFromDataFrames</span><span class="p">([{</span><span class="s1">&#39;name&#39;</span><span class="p">:</span> <span class="s1">&#39;data&#39;</span><span class="p">,</span> <span class="s1">&#39;table&#39;</span><span class="p">:</span> <span class="n">data</span><span class="p">}])</span>
<span class="n">protostr</span> <span class="o">=</span> <span class="n">base64</span><span class="o">.</span><span class="n">b64encode</span><span class="p">(</span><span class="n">proto</span><span class="o">.</span><span class="n">SerializeToString</span><span class="p">())</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">update</span><span class="p">:</span>
<span class="n">script</span> <span class="o">=</span> <span class="n">_OVERVIEW_SCRIPT_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">display_id</span><span class="o">=</span><span class="n">update</span><span class="o">.</span><span class="n">_overview_display_id</span><span class="p">,</span> <span class="n">protostr</span><span class="o">=</span><span class="n">protostr</span><span class="p">)</span>
<span class="n">display_javascript</span><span class="p">(</span><span class="n">Javascript</span><span class="p">(</span><span class="n">script</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">html_str</span> <span class="o">=</span> <span class="n">_OVERVIEW_HTML_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">display_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_overview_display_id</span><span class="p">,</span> <span class="n">protostr</span><span class="o">=</span><span class="n">protostr</span><span class="p">)</span>
<span class="n">display</span><span class="p">(</span><span class="n">HTML</span><span class="p">(</span><span class="n">html_str</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">_display_dataframe</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">data</span><span class="p">,</span> <span class="n">update</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="n">table_id</span> <span class="o">=</span> <span class="s1">&#39;table_</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">update</span><span class="o">.</span><span class="n">_df_display_id</span> <span class="k">if</span> <span class="n">update</span> <span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df_display_id</span><span class="p">)</span>
<span class="n">columns</span> <span class="o">=</span> <span class="p">[{</span>
<span class="s1">&#39;title&#39;</span><span class="p">:</span> <span class="s1">&#39;&#39;</span>
<span class="p">}]</span> <span class="o">+</span> <span class="p">[{</span>
<span class="s1">&#39;title&#39;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">column</span><span class="p">)</span>
<span class="p">}</span> <span class="k">for</span> <span class="n">column</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span><span class="p">]</span>
<span class="n">format_window_info_in_dataframe</span><span class="p">(</span><span class="n">data</span><span class="p">)</span>
<span class="c1"># Convert the dataframe into rows, each row looks like</span>
<span class="c1"># [column_1_val, column_2_val, ...].</span>
<span class="n">rows</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">applymap</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">x</span><span class="p">))</span><span class="o">.</span><span class="n">to_dict</span><span class="p">(</span><span class="s1">&#39;split&#39;</span><span class="p">)[</span><span class="s1">&#39;data&#39;</span><span class="p">]</span>
<span class="c1"># Convert each row into dict where keys are column index in the datatable</span>
<span class="c1"># to be rendered and values are data from the dataframe. Column index 0 is</span>
<span class="c1"># left out to hold the int index (not part of the data) from dataframe.</span>
<span class="c1"># Each row becomes: {1: column_1_val, 2: column_2_val, ...}.</span>
<span class="n">rows</span> <span class="o">=</span> <span class="p">[{</span><span class="n">k</span> <span class="o">+</span> <span class="mi">1</span><span class="p">:</span> <span class="n">v</span> <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="n">row</span><span class="p">)}</span> <span class="k">for</span> <span class="n">row</span> <span class="ow">in</span> <span class="n">rows</span><span class="p">]</span>
<span class="c1"># Add the dataframe int index (used as default ordering column) to datatable</span>
<span class="c1"># column index 0 (will be rendered as the first column).</span>
<span class="c1"># Each row becomes:</span>
<span class="c1"># {1: column_1_val, 2: column_2_val, ..., 0: int_index_in_dataframe}.</span>
<span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">row</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="n">rows</span><span class="p">):</span>
<span class="n">row</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="o">=</span> <span class="n">k</span>
<span class="n">script</span> <span class="o">=</span> <span class="n">_DATAFRAME_SCRIPT_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">table_id</span><span class="o">=</span><span class="n">table_id</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="n">columns</span><span class="p">,</span> <span class="n">data_as_rows</span><span class="o">=</span><span class="n">rows</span><span class="p">)</span>
<span class="n">script_in_jquery_with_datatable</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">_JQUERY_WITH_DATATABLE_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">customized_script</span><span class="o">=</span><span class="n">script</span><span class="p">)</span>
<span class="c1"># Dynamically load data into the existing datatable if not empty.</span>
<span class="k">if</span> <span class="n">update</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">update</span><span class="o">.</span><span class="n">_is_datatable_empty</span><span class="p">:</span>
<span class="n">display_javascript</span><span class="p">(</span><span class="n">Javascript</span><span class="p">(</span><span class="n">script_in_jquery_with_datatable</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">if</span> <span class="n">data</span><span class="o">.</span><span class="n">empty</span><span class="p">:</span>
<span class="n">html_str</span> <span class="o">=</span> <span class="n">_NO_DATA_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">id</span><span class="o">=</span><span class="n">table_id</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">html_str</span> <span class="o">=</span> <span class="n">_DATAFRAME_PAGINATION_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">table_id</span><span class="o">=</span><span class="n">table_id</span><span class="p">,</span>
<span class="n">script_in_jquery_with_datatable</span><span class="o">=</span><span class="n">script_in_jquery_with_datatable</span><span class="p">)</span>
<span class="k">if</span> <span class="n">update</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">data</span><span class="o">.</span><span class="n">empty</span><span class="p">:</span>
<span class="c1"># Initialize a datatable to replace the existing no data div.</span>
<span class="n">display</span><span class="p">(</span>
<span class="n">Javascript</span><span class="p">(</span>
<span class="n">ie</span><span class="o">.</span><span class="n">_JQUERY_WITH_DATATABLE_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">customized_script</span><span class="o">=</span><span class="n">_NO_DATA_REMOVAL_SCRIPT</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="nb">id</span><span class="o">=</span><span class="n">table_id</span><span class="p">))))</span>
<span class="n">display</span><span class="p">(</span><span class="n">HTML</span><span class="p">(</span><span class="n">html_str</span><span class="p">),</span> <span class="n">display_id</span><span class="o">=</span><span class="n">update</span><span class="o">.</span><span class="n">_df_display_id</span><span class="p">)</span>
<span class="n">update</span><span class="o">.</span><span class="n">_is_datatable_empty</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">display</span><span class="p">(</span><span class="n">HTML</span><span class="p">(</span><span class="n">html_str</span><span class="p">),</span> <span class="n">display_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_df_display_id</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">data</span><span class="o">.</span><span class="n">empty</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_is_datatable_empty</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">def</span> <span class="nf">_to_dataframe</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">results</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_stream</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="n">tail</span><span class="o">=</span><span class="kc">False</span><span class="p">))</span>
<span class="k">return</span> <span class="n">elements_to_df</span><span class="p">(</span>
<span class="n">results</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_include_window_info</span><span class="p">,</span> <span class="n">element_type</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_element_type</span><span class="p">)</span></div>
<div class="viewcode-block" id="format_window_info_in_dataframe"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.format_window_info_in_dataframe">[docs]</a><span class="k">def</span> <span class="nf">format_window_info_in_dataframe</span><span class="p">(</span><span class="n">data</span><span class="p">):</span>
<span class="k">if</span> <span class="s1">&#39;event_time&#39;</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span><span class="p">:</span>
<span class="n">data</span><span class="p">[</span><span class="s1">&#39;event_time&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;event_time&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">apply</span><span class="p">(</span><span class="n">event_time_formatter</span><span class="p">)</span>
<span class="k">if</span> <span class="s1">&#39;windows&#39;</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span><span class="p">:</span>
<span class="n">data</span><span class="p">[</span><span class="s1">&#39;windows&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;windows&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">apply</span><span class="p">(</span><span class="n">windows_formatter</span><span class="p">)</span>
<span class="k">if</span> <span class="s1">&#39;pane_info&#39;</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span><span class="p">:</span>
<span class="n">data</span><span class="p">[</span><span class="s1">&#39;pane_info&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;pane_info&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">apply</span><span class="p">(</span><span class="n">pane_info_formatter</span><span class="p">)</span></div>
<div class="viewcode-block" id="event_time_formatter"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.event_time_formatter">[docs]</a><span class="k">def</span> <span class="nf">event_time_formatter</span><span class="p">(</span><span class="n">event_time_us</span><span class="p">):</span>
<span class="n">options</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">options</span>
<span class="n">to_tz</span> <span class="o">=</span> <span class="n">options</span><span class="o">.</span><span class="n">display_timezone</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">datetime</span><span class="o">.</span><span class="n">datetime</span><span class="o">.</span><span class="n">utcfromtimestamp</span><span class="p">(</span><span class="n">event_time_us</span> <span class="o">/</span> <span class="mi">1000000</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span>
<span class="n">tzinfo</span><span class="o">=</span><span class="n">tz</span><span class="o">.</span><span class="n">tzutc</span><span class="p">())</span><span class="o">.</span><span class="n">astimezone</span><span class="p">(</span><span class="n">to_tz</span><span class="p">)</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span>
<span class="n">options</span><span class="o">.</span><span class="n">display_timestamp_format</span><span class="p">))</span>
<span class="k">except</span> <span class="ne">ValueError</span><span class="p">:</span>
<span class="k">if</span> <span class="n">event_time_us</span> <span class="o">&lt;</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">return</span> <span class="s1">&#39;Min Timestamp&#39;</span>
<span class="k">return</span> <span class="s1">&#39;Max Timestamp&#39;</span></div>
<div class="viewcode-block" id="windows_formatter"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.windows_formatter">[docs]</a><span class="k">def</span> <span class="nf">windows_formatter</span><span class="p">(</span><span class="n">windows</span><span class="p">):</span>
<span class="n">result</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">w</span> <span class="ow">in</span> <span class="n">windows</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">w</span><span class="p">,</span> <span class="n">GlobalWindow</span><span class="p">):</span>
<span class="n">result</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">w</span><span class="p">))</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">w</span><span class="p">,</span> <span class="n">IntervalWindow</span><span class="p">):</span>
<span class="c1"># First get the duration in terms of hours, minutes, seconds, and</span>
<span class="c1"># micros.</span>
<span class="n">duration</span> <span class="o">=</span> <span class="n">w</span><span class="o">.</span><span class="n">end</span><span class="o">.</span><span class="n">micros</span> <span class="o">-</span> <span class="n">w</span><span class="o">.</span><span class="n">start</span><span class="o">.</span><span class="n">micros</span>
<span class="n">duration_secs</span> <span class="o">=</span> <span class="n">duration</span> <span class="o">//</span> <span class="mi">1000000</span>
<span class="n">hours</span><span class="p">,</span> <span class="n">remainder</span> <span class="o">=</span> <span class="nb">divmod</span><span class="p">(</span><span class="n">duration_secs</span><span class="p">,</span> <span class="mi">3600</span><span class="p">)</span>
<span class="n">minutes</span><span class="p">,</span> <span class="n">seconds</span> <span class="o">=</span> <span class="nb">divmod</span><span class="p">(</span><span class="n">remainder</span><span class="p">,</span> <span class="mi">60</span><span class="p">)</span>
<span class="n">micros</span> <span class="o">=</span> <span class="p">(</span><span class="n">duration</span> <span class="o">-</span> <span class="n">duration_secs</span> <span class="o">*</span> <span class="mi">1000000</span><span class="p">)</span> <span class="o">%</span> <span class="mi">1000000</span>
<span class="c1"># Construct the duration string. Try and write the string in such a</span>
<span class="c1"># way that minimizes the amount of characters written.</span>
<span class="n">duration</span> <span class="o">=</span> <span class="s1">&#39;&#39;</span>
<span class="k">if</span> <span class="n">hours</span><span class="p">:</span>
<span class="n">duration</span> <span class="o">+=</span> <span class="s1">&#39;</span><span class="si">{}</span><span class="s1">h &#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">hours</span><span class="p">)</span>
<span class="k">if</span> <span class="n">minutes</span> <span class="ow">or</span> <span class="p">(</span><span class="n">hours</span> <span class="ow">and</span> <span class="n">seconds</span><span class="p">):</span>
<span class="n">duration</span> <span class="o">+=</span> <span class="s1">&#39;</span><span class="si">{}</span><span class="s1">m &#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">minutes</span><span class="p">)</span>
<span class="k">if</span> <span class="n">seconds</span><span class="p">:</span>
<span class="k">if</span> <span class="n">micros</span><span class="p">:</span>
<span class="n">duration</span> <span class="o">+=</span> <span class="s1">&#39;</span><span class="si">{}</span><span class="s1">.</span><span class="si">{:06}</span><span class="s1">s&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">seconds</span><span class="p">,</span> <span class="n">micros</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">duration</span> <span class="o">+=</span> <span class="s1">&#39;</span><span class="si">{}</span><span class="s1">s&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">seconds</span><span class="p">)</span>
<span class="n">start</span> <span class="o">=</span> <span class="n">event_time_formatter</span><span class="p">(</span><span class="n">w</span><span class="o">.</span><span class="n">start</span><span class="o">.</span><span class="n">micros</span><span class="p">)</span>
<span class="n">result</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;</span><span class="si">{}</span><span class="s1"> (</span><span class="si">{}</span><span class="s1">)&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">start</span><span class="p">,</span> <span class="n">duration</span><span class="p">))</span>
<span class="k">return</span> <span class="s1">&#39;,&#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">result</span><span class="p">)</span></div>
<div class="viewcode-block" id="pane_info_formatter"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.pane_info_formatter">[docs]</a><span class="k">def</span> <span class="nf">pane_info_formatter</span><span class="p">(</span><span class="n">pane_info</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils.windowed_value</span> <span class="kn">import</span> <span class="n">PaneInfo</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils.windowed_value</span> <span class="kn">import</span> <span class="n">PaneInfoTiming</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pane_info</span><span class="p">,</span> <span class="n">PaneInfo</span><span class="p">)</span>
<span class="n">result</span> <span class="o">=</span> <span class="s1">&#39;Pane </span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">pane_info</span><span class="o">.</span><span class="n">index</span><span class="p">)</span>
<span class="n">timing_info</span> <span class="o">=</span> <span class="s1">&#39;</span><span class="si">{}{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="s1">&#39;Final &#39;</span> <span class="k">if</span> <span class="n">pane_info</span><span class="o">.</span><span class="n">is_last</span> <span class="k">else</span> <span class="s1">&#39;&#39;</span><span class="p">,</span>
<span class="n">PaneInfoTiming</span><span class="o">.</span><span class="n">to_string</span><span class="p">(</span><span class="n">pane_info</span><span class="o">.</span><span class="n">timing</span><span class="p">)</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span><span class="o">.</span><span class="n">capitalize</span><span class="p">()</span> <span class="k">if</span>
<span class="n">pane_info</span><span class="o">.</span><span class="n">timing</span> <span class="ow">in</span> <span class="p">(</span><span class="n">PaneInfoTiming</span><span class="o">.</span><span class="n">EARLY</span><span class="p">,</span> <span class="n">PaneInfoTiming</span><span class="o">.</span><span class="n">LATE</span><span class="p">)</span> <span class="k">else</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">timing_info</span><span class="p">:</span>
<span class="n">result</span> <span class="o">+=</span> <span class="s1">&#39;: &#39;</span> <span class="o">+</span> <span class="n">timing_info</span>
<span class="k">return</span> <span class="n">result</span></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>