| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>apache_beam.runners.interactive.display.pcoll_visualization — Apache Beam 2.47.0 documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../../../" src="../../../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../../../_static/language_data.js"></script> |
| <script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| |
| <script type="text/javascript" src="../../../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../../../_static/pygments.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../../../search.html" /> |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../../../index.html" class="icon icon-home"> Apache Beam |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 2.47.0 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.ml.html">apache_beam.ml package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.testing.html">apache_beam.testing package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.yaml.html">apache_beam.yaml package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../../../index.html">Apache Beam</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../../../index.html">Module code</a> »</li> |
| |
| <li>apache_beam.runners.interactive.display.pcoll_visualization</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.runners.interactive.display.pcoll_visualization</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="sd">"""Module visualizes PCollection data.</span> |
| |
| <span class="sd">For internal use only; no backwards-compatibility guarantees.</span> |
| <span class="sd">Only works with Python 3.5+.</span> |
| <span class="sd">"""</span> |
| <span class="c1"># pytype: skip-file</span> |
| |
| <span class="kn">import</span> <span class="nn">base64</span> |
| <span class="kn">import</span> <span class="nn">datetime</span> |
| <span class="kn">import</span> <span class="nn">html</span> |
| <span class="kn">import</span> <span class="nn">logging</span> |
| <span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">timedelta</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Optional</span> |
| |
| <span class="kn">from</span> <span class="nn">dateutil</span> <span class="kn">import</span> <span class="n">tz</span> |
| |
| <span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.utils</span> <span class="kn">import</span> <span class="n">elements_to_df</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">GlobalWindow</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">IntervalWindow</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">IPython</span> <span class="kn">import</span> <span class="n">get_ipython</span> <span class="c1"># pylint: disable=import-error</span> |
| <span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">HTML</span> <span class="c1"># pylint: disable=import-error</span> |
| <span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">Javascript</span> <span class="c1"># pylint: disable=import-error</span> |
| <span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">display</span> <span class="c1"># pylint: disable=import-error</span> |
| <span class="kn">from</span> <span class="nn">IPython.display</span> <span class="kn">import</span> <span class="n">display_javascript</span> <span class="c1"># pylint: disable=import-error</span> |
| <span class="kn">from</span> <span class="nn">facets_overview.generic_feature_statistics_generator</span> <span class="kn">import</span> <span class="n">GenericFeatureStatisticsGenerator</span> <span class="c1"># pylint: disable=import-error</span> |
| <span class="kn">from</span> <span class="nn">timeloop</span> <span class="kn">import</span> <span class="n">Timeloop</span> <span class="c1"># pylint: disable=import-error</span> |
| |
| <span class="k">if</span> <span class="n">get_ipython</span><span class="p">():</span> |
| <span class="n">_pcoll_visualization_ready</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">_pcoll_visualization_ready</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span> |
| <span class="n">_pcoll_visualization_ready</span> <span class="o">=</span> <span class="kc">False</span> |
| |
| <span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span> |
| |
| <span class="n">_CSS</span> <span class="o">=</span> <span class="s2">"""</span> |
| <span class="s2"> <style></span> |
| <span class="s2"> .p-Widget.jp-OutputPrompt.jp-OutputArea-prompt:empty {{</span> |
| <span class="s2"> padding: 0;</span> |
| <span class="s2"> border: 0;</span> |
| <span class="s2"> }}</span> |
| <span class="s2"> .p-Widget.jp-RenderedJavaScript.jp-mod-trusted.jp-OutputArea-output:empty {{</span> |
| <span class="s2"> padding: 0;</span> |
| <span class="s2"> border: 0;</span> |
| <span class="s2"> }}</span> |
| <span class="s2"> </style>"""</span> |
| <span class="n">_DIVE_SCRIPT_TEMPLATE</span> <span class="o">=</span> <span class="s2">"""</span> |
| <span class="s2"> try {{</span> |
| <span class="s2"> document</span> |
| <span class="s2"> .getElementById("</span><span class="si">{display_id}</span><span class="s2">")</span> |
| <span class="s2"> .contentDocument</span> |
| <span class="s2"> .getElementById("</span><span class="si">{display_id}</span><span class="s2">")</span> |
| <span class="s2"> .data = </span><span class="si">{jsonstr}</span><span class="s2">;</span> |
| <span class="s2"> }} catch (e) {{</span> |
| <span class="s2"> // NOOP when the user has cleared the output from the notebook.</span> |
| <span class="s2"> }}"""</span> |
| <span class="n">_DIVE_HTML_TEMPLATE</span> <span class="o">=</span> <span class="n">_CSS</span> <span class="o">+</span> <span class="s2">"""</span> |
| <span class="s2"> <iframe id=</span><span class="si">{display_id}</span><span class="s2"> style="border:none" width="100%" height="600px"</span> |
| <span class="s2"> srcdoc='</span> |
| <span class="s2"> <script src="https://cdnjs.cloudflare.com/ajax/libs/webcomponentsjs/1.3.3/webcomponents-lite.js"></script></span> |
| <span class="s2"> <link rel="import" href="https://raw.githubusercontent.com/PAIR-code/facets/1.0.0/facets-dist/facets-jupyter.html"></span> |
| <span class="s2"> <facets-dive sprite-image-width="</span><span class="si">{sprite_size}</span><span class="s2">" sprite-image-height="</span><span class="si">{sprite_size}</span><span class="s2">" id="</span><span class="si">{display_id}</span><span class="s2">" height="600"></facets-dive></span> |
| <span class="s2"> <script></span> |
| <span class="s2"> document.getElementById("</span><span class="si">{display_id}</span><span class="s2">").data = </span><span class="si">{jsonstr}</span><span class="s2">;</span> |
| <span class="s2"> </script></span> |
| <span class="s2"> '></span> |
| <span class="s2"> </iframe>"""</span> |
| <span class="n">_OVERVIEW_SCRIPT_TEMPLATE</span> <span class="o">=</span> <span class="s2">"""</span> |
| <span class="s2"> try {{</span> |
| <span class="s2"> document</span> |
| <span class="s2"> .getElementById("</span><span class="si">{display_id}</span><span class="s2">")</span> |
| <span class="s2"> .contentDocument</span> |
| <span class="s2"> .getElementById("</span><span class="si">{display_id}</span><span class="s2">")</span> |
| <span class="s2"> .protoInput = "</span><span class="si">{protostr}</span><span class="s2">";</span> |
| <span class="s2"> }} catch (e) {{</span> |
| <span class="s2"> // NOOP when the user has cleared the output from the notebook.</span> |
| <span class="s2"> }}"""</span> |
| <span class="n">_OVERVIEW_HTML_TEMPLATE</span> <span class="o">=</span> <span class="n">_CSS</span> <span class="o">+</span> <span class="s2">"""</span> |
| <span class="s2"> <iframe id=</span><span class="si">{display_id}</span><span class="s2"> style="border:none" width="100%" height="600px"</span> |
| <span class="s2"> srcdoc='</span> |
| <span class="s2"> <script src="https://cdnjs.cloudflare.com/ajax/libs/webcomponentsjs/1.3.3/webcomponents-lite.js"></script></span> |
| <span class="s2"> <link rel="import" href="https://raw.githubusercontent.com/PAIR-code/facets/1.0.0/facets-dist/facets-jupyter.html"></span> |
| <span class="s2"> <facets-overview id="</span><span class="si">{display_id}</span><span class="s2">"></facets-overview></span> |
| <span class="s2"> <script></span> |
| <span class="s2"> document.getElementById("</span><span class="si">{display_id}</span><span class="s2">").protoInput = "</span><span class="si">{protostr}</span><span class="s2">";</span> |
| <span class="s2"> </script></span> |
| <span class="s2"> '></span> |
| <span class="s2"> </iframe>"""</span> |
| <span class="n">_DATATABLE_INITIALIZATION_CONFIG</span> <span class="o">=</span> <span class="s2">"""</span> |
| <span class="s2"> bAutoWidth: false,</span> |
| <span class="s2"> columns: </span><span class="si">{columns}</span><span class="s2">,</span> |
| <span class="s2"> destroy: true,</span> |
| <span class="s2"> responsive: true,</span> |
| <span class="s2"> columnDefs: [</span> |
| <span class="s2"> {{</span> |
| <span class="s2"> targets: "_all",</span> |
| <span class="s2"> className: "dt-left"</span> |
| <span class="s2"> }},</span> |
| <span class="s2"> {{</span> |
| <span class="s2"> "targets": 0,</span> |
| <span class="s2"> "width": "10px",</span> |
| <span class="s2"> "title": ""</span> |
| <span class="s2"> }}</span> |
| <span class="s2"> ]"""</span> |
| <span class="n">_DATAFRAME_SCRIPT_TEMPLATE</span> <span class="o">=</span> <span class="s2">"""</span> |
| <span class="s2"> var dt;</span> |
| <span class="s2"> if ($.fn.dataTable.isDataTable("#</span><span class="si">{table_id}</span><span class="s2">")) {{</span> |
| <span class="s2"> dt = $("#</span><span class="si">{table_id}</span><span class="s2">").dataTable();</span> |
| <span class="s2"> }} else if ($("#</span><span class="si">{table_id}</span><span class="s2">_wrapper").length == 0) {{</span> |
| <span class="s2"> dt = $("#</span><span class="si">{table_id}</span><span class="s2">").dataTable({{</span> |
| <span class="s2"> """</span> <span class="o">+</span> <span class="n">_DATATABLE_INITIALIZATION_CONFIG</span> <span class="o">+</span> <span class="s2">"""</span> |
| <span class="s2"> }});</span> |
| <span class="s2"> }} else {{</span> |
| <span class="s2"> return;</span> |
| <span class="s2"> }}</span> |
| <span class="s2"> dt.api()</span> |
| <span class="s2"> .clear()</span> |
| <span class="s2"> .rows.add(</span><span class="si">{data_as_rows}</span><span class="s2">)</span> |
| <span class="s2"> .draw('full-hold');"""</span> |
| <span class="n">_DATAFRAME_PAGINATION_TEMPLATE</span> <span class="o">=</span> <span class="n">_CSS</span> <span class="o">+</span> <span class="s2">"""</span> |
| <span class="s2"> <link rel="stylesheet" href="https://cdn.datatables.net/1.10.20/css/jquery.dataTables.min.css"></span> |
| <span class="s2"> <table id="</span><span class="si">{table_id}</span><span class="s2">" class="display" style="display:block"></table></span> |
| <span class="s2"> <script></span> |
| <span class="s2"> </span><span class="si">{script_in_jquery_with_datatable}</span> |
| <span class="s2"> </script>"""</span> |
| <span class="n">_NO_DATA_TEMPLATE</span> <span class="o">=</span> <span class="n">_CSS</span> <span class="o">+</span> <span class="s2">"""</span> |
| <span class="s2"> <div id="no_data_</span><span class="si">{id}</span><span class="s2">">No data to display.</div>"""</span> |
| <span class="n">_NO_DATA_REMOVAL_SCRIPT</span> <span class="o">=</span> <span class="s2">"""</span> |
| <span class="s2"> $("#no_data_</span><span class="si">{id}</span><span class="s2">").remove();"""</span> |
| |
| |
| <div class="viewcode-block" id="visualize"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.visualize">[docs]</a><span class="k">def</span> <span class="nf">visualize</span><span class="p">(</span> |
| <span class="n">stream</span><span class="p">,</span> |
| <span class="n">dynamic_plotting_interval</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">include_window_info</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">display_facets</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">element_type</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Visualizes the data of a given PCollection. Optionally enables dynamic</span> |
| <span class="sd"> plotting with interval in seconds if the PCollection is being produced by a</span> |
| <span class="sd"> running pipeline or the pipeline is streaming indefinitely. The function</span> |
| <span class="sd"> always returns immediately and is asynchronous when dynamic plotting is on.</span> |
| |
| <span class="sd"> If dynamic plotting enabled, the visualization is updated continuously until</span> |
| <span class="sd"> the pipeline producing the PCollection is in an end state. The visualization</span> |
| <span class="sd"> would be anchored to the notebook cell output area. The function</span> |
| <span class="sd"> asynchronously returns a handle to the visualization job immediately. The user</span> |
| <span class="sd"> could manually do::</span> |
| |
| <span class="sd"> # In one notebook cell, enable dynamic plotting every 1 second:</span> |
| <span class="sd"> handle = visualize(pcoll, dynamic_plotting_interval=1)</span> |
| <span class="sd"> # Visualization anchored to the cell's output area.</span> |
| <span class="sd"> # In a different cell:</span> |
| <span class="sd"> handle.stop()</span> |
| <span class="sd"> # Will stop the dynamic plotting of the above visualization manually.</span> |
| <span class="sd"> # Otherwise, dynamic plotting ends when pipeline is not running anymore.</span> |
| |
| <span class="sd"> If dynamic_plotting is not enabled (by default), None is returned.</span> |
| |
| <span class="sd"> If include_window_info is True, the data will include window information,</span> |
| <span class="sd"> which consists of the event timestamps, windows, and pane info.</span> |
| |
| <span class="sd"> If display_facets is True, the facets widgets will be rendered. Otherwise, the</span> |
| <span class="sd"> facets widgets will not be rendered.</span> |
| |
| <span class="sd"> The function is experimental. For internal use only; no</span> |
| <span class="sd"> backwards-compatibility guarantees.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">_pcoll_visualization_ready</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">None</span> |
| <span class="n">pv</span> <span class="o">=</span> <span class="n">PCollectionVisualization</span><span class="p">(</span> |
| <span class="n">stream</span><span class="p">,</span> |
| <span class="n">include_window_info</span><span class="o">=</span><span class="n">include_window_info</span><span class="p">,</span> |
| <span class="n">display_facets</span><span class="o">=</span><span class="n">display_facets</span><span class="p">,</span> |
| <span class="n">element_type</span><span class="o">=</span><span class="n">element_type</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">is_in_notebook</span><span class="p">:</span> |
| <span class="n">pv</span><span class="o">.</span><span class="n">display</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">pv</span><span class="o">.</span><span class="n">display_plain_text</span><span class="p">()</span> |
| <span class="c1"># We don't want to do dynamic plotting if there is no notebook frontend.</span> |
| <span class="k">return</span> <span class="kc">None</span> |
| |
| <span class="k">if</span> <span class="n">dynamic_plotting_interval</span><span class="p">:</span> |
| <span class="c1"># Disables the verbose logging from timeloop.</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s1">'timeloop'</span><span class="p">)</span><span class="o">.</span><span class="n">disabled</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="n">tl</span> <span class="o">=</span> <span class="n">Timeloop</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">dynamic_plotting</span><span class="p">(</span><span class="n">stream</span><span class="p">,</span> <span class="n">pv</span><span class="p">,</span> <span class="n">tl</span><span class="p">,</span> <span class="n">include_window_info</span><span class="p">,</span> <span class="n">display_facets</span><span class="p">):</span> |
| <span class="nd">@tl</span><span class="o">.</span><span class="n">job</span><span class="p">(</span><span class="n">interval</span><span class="o">=</span><span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">dynamic_plotting_interval</span><span class="p">))</span> |
| <span class="k">def</span> <span class="nf">continuous_update_display</span><span class="p">():</span> <span class="c1"># pylint: disable=unused-variable</span> |
| <span class="c1"># Always creates a new PCollVisualization instance when the</span> |
| <span class="c1"># PCollection materialization is being updated and dynamic</span> |
| <span class="c1"># plotting is in-process.</span> |
| <span class="c1"># PCollectionVisualization created at this level doesn't need dynamic</span> |
| <span class="c1"># plotting interval information when instantiated because it's already</span> |
| <span class="c1"># in dynamic plotting logic.</span> |
| <span class="n">updated_pv</span> <span class="o">=</span> <span class="n">PCollectionVisualization</span><span class="p">(</span> |
| <span class="n">stream</span><span class="p">,</span> |
| <span class="n">include_window_info</span><span class="o">=</span><span class="n">include_window_info</span><span class="p">,</span> |
| <span class="n">display_facets</span><span class="o">=</span><span class="n">display_facets</span><span class="p">,</span> |
| <span class="n">element_type</span><span class="o">=</span><span class="n">element_type</span><span class="p">)</span> |
| <span class="n">updated_pv</span><span class="o">.</span><span class="n">display</span><span class="p">(</span><span class="n">updating_pv</span><span class="o">=</span><span class="n">pv</span><span class="p">)</span> |
| |
| <span class="c1"># Stop updating the visualizations as soon as the stream will not yield</span> |
| <span class="c1"># new elements.</span> |
| <span class="k">if</span> <span class="n">stream</span><span class="o">.</span><span class="n">is_done</span><span class="p">():</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">tl</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span> |
| <span class="k">except</span> <span class="ne">RuntimeError</span><span class="p">:</span> |
| <span class="c1"># The job can only be stopped once. Ignore excessive stops.</span> |
| <span class="k">pass</span> |
| |
| <span class="n">tl</span><span class="o">.</span><span class="n">start</span><span class="p">()</span> |
| <span class="k">return</span> <span class="n">tl</span> |
| |
| <span class="k">return</span> <span class="n">dynamic_plotting</span><span class="p">(</span><span class="n">stream</span><span class="p">,</span> <span class="n">pv</span><span class="p">,</span> <span class="n">tl</span><span class="p">,</span> <span class="n">include_window_info</span><span class="p">,</span> <span class="n">display_facets</span><span class="p">)</span> |
| <span class="k">return</span> <span class="kc">None</span></div> |
| |
| |
| <div class="viewcode-block" id="visualize_computed_pcoll"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.visualize_computed_pcoll">[docs]</a><span class="k">def</span> <span class="nf">visualize_computed_pcoll</span><span class="p">(</span> |
| <span class="n">pcoll_name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">pcoll</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">PCollection</span><span class="p">,</span> |
| <span class="n">max_n</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> |
| <span class="n">max_duration_secs</span><span class="p">:</span> <span class="nb">float</span><span class="p">,</span> |
| <span class="n">dynamic_plotting_interval</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">include_window_info</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span> |
| <span class="n">display_facets</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""A simple visualize alternative.</span> |
| |
| <span class="sd"> When the pcoll_name and pcoll pair identifies a watched and computed</span> |
| <span class="sd"> PCollection in the current interactive environment without ambiguity, an</span> |
| <span class="sd"> ElementStream can be built directly from cache. Returns immediately, the</span> |
| <span class="sd"> visualization is asynchronous, but guaranteed to end in the near future.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> pcoll_name: the variable name of the PCollection.</span> |
| <span class="sd"> pcoll: the PCollection to be visualized.</span> |
| <span class="sd"> max_n: the maximum number of elements to visualize.</span> |
| <span class="sd"> max_duration_secs: max duration of elements to read in seconds.</span> |
| <span class="sd"> dynamic_plotting_interval: the interval in seconds between visualization</span> |
| <span class="sd"> updates if provided; otherwise, no dynamic plotting.</span> |
| <span class="sd"> include_window_info: whether to include windowing info in the elements.</span> |
| <span class="sd"> display_facets: whether to display the facets widgets.</span> |
| <span class="sd"> """</span> |
| <span class="n">pipeline</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">user_pipeline</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span> |
| <span class="n">rm</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">get_recording_manager</span><span class="p">(</span><span class="n">pipeline</span><span class="p">,</span> <span class="n">create_if_absent</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> |
| |
| <span class="n">stream</span> <span class="o">=</span> <span class="n">rm</span><span class="o">.</span><span class="n">read</span><span class="p">(</span> |
| <span class="n">pcoll_name</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">,</span> <span class="n">max_n</span><span class="o">=</span><span class="n">max_n</span><span class="p">,</span> <span class="n">max_duration_secs</span><span class="o">=</span><span class="n">max_duration_secs</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">stream</span><span class="p">:</span> |
| <span class="n">visualize</span><span class="p">(</span> |
| <span class="n">stream</span><span class="p">,</span> |
| <span class="n">dynamic_plotting_interval</span><span class="o">=</span><span class="n">dynamic_plotting_interval</span><span class="p">,</span> |
| <span class="n">include_window_info</span><span class="o">=</span><span class="n">include_window_info</span><span class="p">,</span> |
| <span class="n">display_facets</span><span class="o">=</span><span class="n">display_facets</span><span class="p">,</span> |
| <span class="n">element_type</span><span class="o">=</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="PCollectionVisualization"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.PCollectionVisualization">[docs]</a><span class="k">class</span> <span class="nc">PCollectionVisualization</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A visualization of a PCollection.</span> |
| |
| <span class="sd"> The class relies on creating a PipelineInstrument w/o actual instrument to</span> |
| <span class="sd"> access current interactive environment for materialized PCollection data at</span> |
| <span class="sd"> the moment of self instantiation through cache.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">stream</span><span class="p">,</span> |
| <span class="n">include_window_info</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">display_facets</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">element_type</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">assert</span> <span class="n">_pcoll_visualization_ready</span><span class="p">,</span> <span class="p">(</span> |
| <span class="s1">'Dependencies for PCollection visualization are not available. Please '</span> |
| <span class="s1">'use `pip install apache-beam[interactive]` to install necessary '</span> |
| <span class="s1">'dependencies and make sure that you are executing code in an '</span> |
| <span class="s1">'interactive environment such as a Jupyter notebook.'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_stream</span> <span class="o">=</span> <span class="n">stream</span> |
| <span class="c1"># Variable name as the title for element value in the rendered data table.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_pcoll_var</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="n">var</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pcoll_var</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_pcoll_var</span> <span class="o">=</span> <span class="s1">'Value'</span> |
| <span class="n">obfuscated_id</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="n">display_id</span><span class="p">(</span><span class="nb">id</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_dive_display_id</span> <span class="o">=</span> <span class="s1">'facets_dive_</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">obfuscated_id</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_overview_display_id</span> <span class="o">=</span> <span class="s1">'facets_overview_</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">obfuscated_id</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_df_display_id</span> <span class="o">=</span> <span class="s1">'df_</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">obfuscated_id</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_include_window_info</span> <span class="o">=</span> <span class="n">include_window_info</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_display_facets</span> <span class="o">=</span> <span class="n">display_facets</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_is_datatable_empty</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_element_type</span> <span class="o">=</span> <span class="n">element_type</span> |
| |
| <div class="viewcode-block" id="PCollectionVisualization.display_plain_text"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.PCollectionVisualization.display_plain_text">[docs]</a> <span class="k">def</span> <span class="nf">display_plain_text</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Displays a head sample of the normalized PCollection data.</span> |
| |
| <span class="sd"> This function is used when the ipython kernel is not connected to a</span> |
| <span class="sd"> notebook frontend such as when running ipython in terminal or in unit tests.</span> |
| <span class="sd"> It's a visualization in terminal-like UI, not a function to retrieve data</span> |
| <span class="sd"> for programmatically usages.</span> |
| <span class="sd"> """</span> |
| <span class="c1"># Double check if the dependency is ready in case someone mistakenly uses</span> |
| <span class="c1"># the function.</span> |
| <span class="k">if</span> <span class="n">_pcoll_visualization_ready</span><span class="p">:</span> |
| <span class="n">data</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_to_dataframe</span><span class="p">()</span> |
| <span class="c1"># Displays a data-table with at most 25 entries from the head.</span> |
| <span class="n">data_sample</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">head</span><span class="p">(</span><span class="mi">25</span><span class="p">)</span> |
| <span class="n">display</span><span class="p">(</span><span class="n">data_sample</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="PCollectionVisualization.display"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.PCollectionVisualization.display">[docs]</a> <span class="k">def</span> <span class="nf">display</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">updating_pv</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Displays the visualization through IPython.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> updating_pv: A PCollectionVisualization object. When provided, the</span> |
| <span class="sd"> display_id of each visualization part will inherit from the initial</span> |
| <span class="sd"> display of updating_pv and only update that visualization web element</span> |
| <span class="sd"> instead of creating new ones.</span> |
| |
| <span class="sd"> The visualization has 3 parts: facets-dive, facets-overview and paginated</span> |
| <span class="sd"> data table. Each part is assigned an auto-generated unique display id</span> |
| <span class="sd"> (the uniqueness is guaranteed throughout the lifespan of the PCollection</span> |
| <span class="sd"> variable).</span> |
| <span class="sd"> """</span> |
| <span class="c1"># Ensures that dive, overview and table render the same data because the</span> |
| <span class="c1"># materialized PCollection data might being updated continuously.</span> |
| <span class="n">data</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_to_dataframe</span><span class="p">()</span> |
| <span class="c1"># Give the numbered column names when visualizing.</span> |
| <span class="n">data</span><span class="o">.</span><span class="n">columns</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_pcoll_var</span> <span class="o">+</span> <span class="s1">'.'</span> <span class="o">+</span> |
| <span class="nb">str</span><span class="p">(</span><span class="n">column</span><span class="p">)</span> <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">column</span><span class="p">,</span> <span class="nb">int</span><span class="p">)</span> <span class="k">else</span> <span class="n">column</span> |
| <span class="k">for</span> <span class="n">column</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span> |
| <span class="p">]</span> |
| <span class="c1"># String-ify the dictionaries for display because elements of type dict</span> |
| <span class="c1"># cannot be ordered.</span> |
| <span class="n">data</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">applymap</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">x</span><span class="p">)</span> <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="nb">dict</span><span class="p">)</span> <span class="k">else</span> <span class="n">x</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">updating_pv</span><span class="p">:</span> |
| <span class="c1"># Only updates when data is not empty. Otherwise, consider it a bad</span> |
| <span class="c1"># iteration and noop since there is nothing to be updated.</span> |
| <span class="k">if</span> <span class="n">data</span><span class="o">.</span><span class="n">empty</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'Skip a visualization update due to empty data.'</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_display_dataframe</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">deep</span><span class="o">=</span><span class="kc">True</span><span class="p">),</span> <span class="n">updating_pv</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_display_facets</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_display_dive</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">deep</span><span class="o">=</span><span class="kc">True</span><span class="p">),</span> <span class="n">updating_pv</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_display_overview</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">deep</span><span class="o">=</span><span class="kc">True</span><span class="p">),</span> <span class="n">updating_pv</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_display_dataframe</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">deep</span><span class="o">=</span><span class="kc">True</span><span class="p">))</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_display_facets</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_display_dive</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">deep</span><span class="o">=</span><span class="kc">True</span><span class="p">))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_display_overview</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">deep</span><span class="o">=</span><span class="kc">True</span><span class="p">))</span></div> |
| |
| <span class="k">def</span> <span class="nf">_display_dive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">data</span><span class="p">,</span> <span class="n">update</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="n">sprite_size</span> <span class="o">=</span> <span class="mi">32</span> <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">data</span><span class="o">.</span><span class="n">index</span><span class="p">)</span> <span class="o">></span> <span class="mi">50000</span> <span class="k">else</span> <span class="mi">64</span> |
| <span class="n">format_window_info_in_dataframe</span><span class="p">(</span><span class="n">data</span><span class="p">)</span> |
| <span class="n">jsonstr</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">to_json</span><span class="p">(</span><span class="n">orient</span><span class="o">=</span><span class="s1">'records'</span><span class="p">,</span> <span class="n">default_handler</span><span class="o">=</span><span class="nb">str</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">update</span><span class="p">:</span> |
| <span class="n">script</span> <span class="o">=</span> <span class="n">_DIVE_SCRIPT_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">display_id</span><span class="o">=</span><span class="n">update</span><span class="o">.</span><span class="n">_dive_display_id</span><span class="p">,</span> <span class="n">jsonstr</span><span class="o">=</span><span class="n">jsonstr</span><span class="p">)</span> |
| <span class="n">display_javascript</span><span class="p">(</span><span class="n">Javascript</span><span class="p">(</span><span class="n">script</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">html_str</span> <span class="o">=</span> <span class="n">_DIVE_HTML_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">display_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_dive_display_id</span><span class="p">,</span> |
| <span class="n">jsonstr</span><span class="o">=</span><span class="n">html</span><span class="o">.</span><span class="n">escape</span><span class="p">(</span><span class="n">jsonstr</span><span class="p">),</span> |
| <span class="n">sprite_size</span><span class="o">=</span><span class="n">sprite_size</span><span class="p">)</span> |
| <span class="n">display</span><span class="p">(</span><span class="n">HTML</span><span class="p">(</span><span class="n">html_str</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">_display_overview</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">data</span><span class="p">,</span> <span class="n">update</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">if</span> <span class="p">(</span><span class="ow">not</span> <span class="n">data</span><span class="o">.</span><span class="n">empty</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">_include_window_info</span> <span class="ow">and</span> |
| <span class="nb">all</span><span class="p">(</span><span class="n">column</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span> |
| <span class="k">for</span> <span class="n">column</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">'event_time'</span><span class="p">,</span> <span class="s1">'windows'</span><span class="p">,</span> <span class="s1">'pane_info'</span><span class="p">))):</span> |
| <span class="n">data</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">drop</span><span class="p">([</span><span class="s1">'event_time'</span><span class="p">,</span> <span class="s1">'windows'</span><span class="p">,</span> <span class="s1">'pane_info'</span><span class="p">],</span> <span class="n">axis</span><span class="o">=</span><span class="mi">1</span><span class="p">)</span> |
| |
| <span class="c1"># GFSG expects all column names to be strings.</span> |
| <span class="n">data</span><span class="o">.</span><span class="n">columns</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span><span class="o">.</span><span class="n">astype</span><span class="p">(</span><span class="nb">str</span><span class="p">)</span> |
| |
| <span class="n">gfsg</span> <span class="o">=</span> <span class="n">GenericFeatureStatisticsGenerator</span><span class="p">()</span> |
| <span class="n">proto</span> <span class="o">=</span> <span class="n">gfsg</span><span class="o">.</span><span class="n">ProtoFromDataFrames</span><span class="p">([{</span><span class="s1">'name'</span><span class="p">:</span> <span class="s1">'data'</span><span class="p">,</span> <span class="s1">'table'</span><span class="p">:</span> <span class="n">data</span><span class="p">}])</span> |
| <span class="n">protostr</span> <span class="o">=</span> <span class="n">base64</span><span class="o">.</span><span class="n">b64encode</span><span class="p">(</span><span class="n">proto</span><span class="o">.</span><span class="n">SerializeToString</span><span class="p">())</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">update</span><span class="p">:</span> |
| <span class="n">script</span> <span class="o">=</span> <span class="n">_OVERVIEW_SCRIPT_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">display_id</span><span class="o">=</span><span class="n">update</span><span class="o">.</span><span class="n">_overview_display_id</span><span class="p">,</span> <span class="n">protostr</span><span class="o">=</span><span class="n">protostr</span><span class="p">)</span> |
| <span class="n">display_javascript</span><span class="p">(</span><span class="n">Javascript</span><span class="p">(</span><span class="n">script</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">html_str</span> <span class="o">=</span> <span class="n">_OVERVIEW_HTML_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">display_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_overview_display_id</span><span class="p">,</span> <span class="n">protostr</span><span class="o">=</span><span class="n">protostr</span><span class="p">)</span> |
| <span class="n">display</span><span class="p">(</span><span class="n">HTML</span><span class="p">(</span><span class="n">html_str</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">_display_dataframe</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">data</span><span class="p">,</span> <span class="n">update</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="n">table_id</span> <span class="o">=</span> <span class="s1">'table_</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">update</span><span class="o">.</span><span class="n">_df_display_id</span> <span class="k">if</span> <span class="n">update</span> <span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df_display_id</span><span class="p">)</span> |
| <span class="n">columns</span> <span class="o">=</span> <span class="p">[{</span> |
| <span class="s1">'title'</span><span class="p">:</span> <span class="s1">''</span> |
| <span class="p">}]</span> <span class="o">+</span> <span class="p">[{</span> |
| <span class="s1">'title'</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">column</span><span class="p">)</span> |
| <span class="p">}</span> <span class="k">for</span> <span class="n">column</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span><span class="p">]</span> |
| <span class="n">format_window_info_in_dataframe</span><span class="p">(</span><span class="n">data</span><span class="p">)</span> |
| <span class="c1"># Convert the dataframe into rows, each row looks like</span> |
| <span class="c1"># [column_1_val, column_2_val, ...].</span> |
| <span class="n">rows</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">applymap</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">x</span><span class="p">))</span><span class="o">.</span><span class="n">to_dict</span><span class="p">(</span><span class="s1">'split'</span><span class="p">)[</span><span class="s1">'data'</span><span class="p">]</span> |
| <span class="c1"># Convert each row into dict where keys are column index in the datatable</span> |
| <span class="c1"># to be rendered and values are data from the dataframe. Column index 0 is</span> |
| <span class="c1"># left out to hold the int index (not part of the data) from dataframe.</span> |
| <span class="c1"># Each row becomes: {1: column_1_val, 2: column_2_val, ...}.</span> |
| <span class="n">rows</span> <span class="o">=</span> <span class="p">[{</span><span class="n">k</span> <span class="o">+</span> <span class="mi">1</span><span class="p">:</span> <span class="n">v</span> <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="n">row</span><span class="p">)}</span> <span class="k">for</span> <span class="n">row</span> <span class="ow">in</span> <span class="n">rows</span><span class="p">]</span> |
| <span class="c1"># Add the dataframe int index (used as default ordering column) to datatable</span> |
| <span class="c1"># column index 0 (will be rendered as the first column).</span> |
| <span class="c1"># Each row becomes:</span> |
| <span class="c1"># {1: column_1_val, 2: column_2_val, ..., 0: int_index_in_dataframe}.</span> |
| <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">row</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="n">rows</span><span class="p">):</span> |
| <span class="n">row</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="o">=</span> <span class="n">k</span> |
| <span class="n">script</span> <span class="o">=</span> <span class="n">_DATAFRAME_SCRIPT_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">table_id</span><span class="o">=</span><span class="n">table_id</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="n">columns</span><span class="p">,</span> <span class="n">data_as_rows</span><span class="o">=</span><span class="n">rows</span><span class="p">)</span> |
| <span class="n">script_in_jquery_with_datatable</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">_JQUERY_WITH_DATATABLE_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">customized_script</span><span class="o">=</span><span class="n">script</span><span class="p">)</span> |
| <span class="c1"># Dynamically load data into the existing datatable if not empty.</span> |
| <span class="k">if</span> <span class="n">update</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">update</span><span class="o">.</span><span class="n">_is_datatable_empty</span><span class="p">:</span> |
| <span class="n">display_javascript</span><span class="p">(</span><span class="n">Javascript</span><span class="p">(</span><span class="n">script_in_jquery_with_datatable</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">data</span><span class="o">.</span><span class="n">empty</span><span class="p">:</span> |
| <span class="n">html_str</span> <span class="o">=</span> <span class="n">_NO_DATA_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">id</span><span class="o">=</span><span class="n">table_id</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">html_str</span> <span class="o">=</span> <span class="n">_DATAFRAME_PAGINATION_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">table_id</span><span class="o">=</span><span class="n">table_id</span><span class="p">,</span> |
| <span class="n">script_in_jquery_with_datatable</span><span class="o">=</span><span class="n">script_in_jquery_with_datatable</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">update</span><span class="p">:</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">data</span><span class="o">.</span><span class="n">empty</span><span class="p">:</span> |
| <span class="c1"># Initialize a datatable to replace the existing no data div.</span> |
| <span class="n">display</span><span class="p">(</span> |
| <span class="n">Javascript</span><span class="p">(</span> |
| <span class="n">ie</span><span class="o">.</span><span class="n">_JQUERY_WITH_DATATABLE_TEMPLATE</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">customized_script</span><span class="o">=</span><span class="n">_NO_DATA_REMOVAL_SCRIPT</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="nb">id</span><span class="o">=</span><span class="n">table_id</span><span class="p">))))</span> |
| <span class="n">display</span><span class="p">(</span><span class="n">HTML</span><span class="p">(</span><span class="n">html_str</span><span class="p">),</span> <span class="n">display_id</span><span class="o">=</span><span class="n">update</span><span class="o">.</span><span class="n">_df_display_id</span><span class="p">)</span> |
| <span class="n">update</span><span class="o">.</span><span class="n">_is_datatable_empty</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">display</span><span class="p">(</span><span class="n">HTML</span><span class="p">(</span><span class="n">html_str</span><span class="p">),</span> <span class="n">display_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_df_display_id</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">data</span><span class="o">.</span><span class="n">empty</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_is_datatable_empty</span> <span class="o">=</span> <span class="kc">False</span> |
| |
| <span class="k">def</span> <span class="nf">_to_dataframe</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">results</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_stream</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="n">tail</span><span class="o">=</span><span class="kc">False</span><span class="p">))</span> |
| <span class="k">return</span> <span class="n">elements_to_df</span><span class="p">(</span> |
| <span class="n">results</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_include_window_info</span><span class="p">,</span> <span class="n">element_type</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_element_type</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="format_window_info_in_dataframe"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.format_window_info_in_dataframe">[docs]</a><span class="k">def</span> <span class="nf">format_window_info_in_dataframe</span><span class="p">(</span><span class="n">data</span><span class="p">):</span> |
| <span class="k">if</span> <span class="s1">'event_time'</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span><span class="p">:</span> |
| <span class="n">data</span><span class="p">[</span><span class="s1">'event_time'</span><span class="p">]</span> <span class="o">=</span> <span class="n">data</span><span class="p">[</span><span class="s1">'event_time'</span><span class="p">]</span><span class="o">.</span><span class="n">apply</span><span class="p">(</span><span class="n">event_time_formatter</span><span class="p">)</span> |
| <span class="k">if</span> <span class="s1">'windows'</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span><span class="p">:</span> |
| <span class="n">data</span><span class="p">[</span><span class="s1">'windows'</span><span class="p">]</span> <span class="o">=</span> <span class="n">data</span><span class="p">[</span><span class="s1">'windows'</span><span class="p">]</span><span class="o">.</span><span class="n">apply</span><span class="p">(</span><span class="n">windows_formatter</span><span class="p">)</span> |
| <span class="k">if</span> <span class="s1">'pane_info'</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">columns</span><span class="p">:</span> |
| <span class="n">data</span><span class="p">[</span><span class="s1">'pane_info'</span><span class="p">]</span> <span class="o">=</span> <span class="n">data</span><span class="p">[</span><span class="s1">'pane_info'</span><span class="p">]</span><span class="o">.</span><span class="n">apply</span><span class="p">(</span><span class="n">pane_info_formatter</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="event_time_formatter"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.event_time_formatter">[docs]</a><span class="k">def</span> <span class="nf">event_time_formatter</span><span class="p">(</span><span class="n">event_time_us</span><span class="p">):</span> |
| <span class="n">options</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">options</span> |
| <span class="n">to_tz</span> <span class="o">=</span> <span class="n">options</span><span class="o">.</span><span class="n">display_timezone</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="n">datetime</span><span class="o">.</span><span class="n">datetime</span><span class="o">.</span><span class="n">utcfromtimestamp</span><span class="p">(</span><span class="n">event_time_us</span> <span class="o">/</span> <span class="mi">1000000</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span> |
| <span class="n">tzinfo</span><span class="o">=</span><span class="n">tz</span><span class="o">.</span><span class="n">tzutc</span><span class="p">())</span><span class="o">.</span><span class="n">astimezone</span><span class="p">(</span><span class="n">to_tz</span><span class="p">)</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span> |
| <span class="n">options</span><span class="o">.</span><span class="n">display_timestamp_format</span><span class="p">))</span> |
| <span class="k">except</span> <span class="ne">ValueError</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">event_time_us</span> <span class="o"><</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="k">return</span> <span class="s1">'Min Timestamp'</span> |
| <span class="k">return</span> <span class="s1">'Max Timestamp'</span></div> |
| |
| |
| <div class="viewcode-block" id="windows_formatter"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.windows_formatter">[docs]</a><span class="k">def</span> <span class="nf">windows_formatter</span><span class="p">(</span><span class="n">windows</span><span class="p">):</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">for</span> <span class="n">w</span> <span class="ow">in</span> <span class="n">windows</span><span class="p">:</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">w</span><span class="p">,</span> <span class="n">GlobalWindow</span><span class="p">):</span> |
| <span class="n">result</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">w</span><span class="p">))</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">w</span><span class="p">,</span> <span class="n">IntervalWindow</span><span class="p">):</span> |
| <span class="c1"># First get the duration in terms of hours, minutes, seconds, and</span> |
| <span class="c1"># micros.</span> |
| <span class="n">duration</span> <span class="o">=</span> <span class="n">w</span><span class="o">.</span><span class="n">end</span><span class="o">.</span><span class="n">micros</span> <span class="o">-</span> <span class="n">w</span><span class="o">.</span><span class="n">start</span><span class="o">.</span><span class="n">micros</span> |
| <span class="n">duration_secs</span> <span class="o">=</span> <span class="n">duration</span> <span class="o">//</span> <span class="mi">1000000</span> |
| <span class="n">hours</span><span class="p">,</span> <span class="n">remainder</span> <span class="o">=</span> <span class="nb">divmod</span><span class="p">(</span><span class="n">duration_secs</span><span class="p">,</span> <span class="mi">3600</span><span class="p">)</span> |
| <span class="n">minutes</span><span class="p">,</span> <span class="n">seconds</span> <span class="o">=</span> <span class="nb">divmod</span><span class="p">(</span><span class="n">remainder</span><span class="p">,</span> <span class="mi">60</span><span class="p">)</span> |
| <span class="n">micros</span> <span class="o">=</span> <span class="p">(</span><span class="n">duration</span> <span class="o">-</span> <span class="n">duration_secs</span> <span class="o">*</span> <span class="mi">1000000</span><span class="p">)</span> <span class="o">%</span> <span class="mi">1000000</span> |
| |
| <span class="c1"># Construct the duration string. Try and write the string in such a</span> |
| <span class="c1"># way that minimizes the amount of characters written.</span> |
| <span class="n">duration</span> <span class="o">=</span> <span class="s1">''</span> |
| <span class="k">if</span> <span class="n">hours</span><span class="p">:</span> |
| <span class="n">duration</span> <span class="o">+=</span> <span class="s1">'</span><span class="si">{}</span><span class="s1">h '</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">hours</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">minutes</span> <span class="ow">or</span> <span class="p">(</span><span class="n">hours</span> <span class="ow">and</span> <span class="n">seconds</span><span class="p">):</span> |
| <span class="n">duration</span> <span class="o">+=</span> <span class="s1">'</span><span class="si">{}</span><span class="s1">m '</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">minutes</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">seconds</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">micros</span><span class="p">:</span> |
| <span class="n">duration</span> <span class="o">+=</span> <span class="s1">'</span><span class="si">{}</span><span class="s1">.</span><span class="si">{:06}</span><span class="s1">s'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">seconds</span><span class="p">,</span> <span class="n">micros</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">duration</span> <span class="o">+=</span> <span class="s1">'</span><span class="si">{}</span><span class="s1">s'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">seconds</span><span class="p">)</span> |
| |
| <span class="n">start</span> <span class="o">=</span> <span class="n">event_time_formatter</span><span class="p">(</span><span class="n">w</span><span class="o">.</span><span class="n">start</span><span class="o">.</span><span class="n">micros</span><span class="p">)</span> |
| |
| <span class="n">result</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">'</span><span class="si">{}</span><span class="s1"> (</span><span class="si">{}</span><span class="s1">)'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">start</span><span class="p">,</span> <span class="n">duration</span><span class="p">))</span> |
| |
| <span class="k">return</span> <span class="s1">','</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">result</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="pane_info_formatter"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.display.pcoll_visualization.html#apache_beam.runners.interactive.display.pcoll_visualization.pane_info_formatter">[docs]</a><span class="k">def</span> <span class="nf">pane_info_formatter</span><span class="p">(</span><span class="n">pane_info</span><span class="p">):</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.utils.windowed_value</span> <span class="kn">import</span> <span class="n">PaneInfo</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.utils.windowed_value</span> <span class="kn">import</span> <span class="n">PaneInfoTiming</span> |
| <span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">pane_info</span><span class="p">,</span> <span class="n">PaneInfo</span><span class="p">)</span> |
| |
| <span class="n">result</span> <span class="o">=</span> <span class="s1">'Pane </span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">pane_info</span><span class="o">.</span><span class="n">index</span><span class="p">)</span> |
| <span class="n">timing_info</span> <span class="o">=</span> <span class="s1">'</span><span class="si">{}{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="s1">'Final '</span> <span class="k">if</span> <span class="n">pane_info</span><span class="o">.</span><span class="n">is_last</span> <span class="k">else</span> <span class="s1">''</span><span class="p">,</span> |
| <span class="n">PaneInfoTiming</span><span class="o">.</span><span class="n">to_string</span><span class="p">(</span><span class="n">pane_info</span><span class="o">.</span><span class="n">timing</span><span class="p">)</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span><span class="o">.</span><span class="n">capitalize</span><span class="p">()</span> <span class="k">if</span> |
| <span class="n">pane_info</span><span class="o">.</span><span class="n">timing</span> <span class="ow">in</span> <span class="p">(</span><span class="n">PaneInfoTiming</span><span class="o">.</span><span class="n">EARLY</span><span class="p">,</span> <span class="n">PaneInfoTiming</span><span class="o">.</span><span class="n">LATE</span><span class="p">)</span> <span class="k">else</span> <span class="s1">''</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">timing_info</span><span class="p">:</span> |
| <span class="n">result</span> <span class="o">+=</span> <span class="s1">': '</span> <span class="o">+</span> <span class="n">timing_info</span> |
| |
| <span class="k">return</span> <span class="n">result</span></div> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| © Copyright |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |