blob: d1e4523642fdeb98286b32652230c059036654fe [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.runners.interactive.sql.beam_sql_magics &mdash; Apache Beam 2.47.0 documentation</title>
<script type="text/javascript" src="../../../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../../../" src="../../../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../../../genindex.html" />
<link rel="search" title="Search" href="../../../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.47.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.runners.interactive.sql.beam_sql_magics</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.runners.interactive.sql.beam_sql_magics</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Module of beam_sql cell magic that executes a Beam SQL.</span>
<span class="sd">Only works within an IPython kernel.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="kn">import</span> <span class="nn">argparse</span>
<span class="kn">import</span> <span class="nn">importlib</span>
<span class="kn">import</span> <span class="nn">keyword</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">traceback</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Dict</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">List</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Optional</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Tuple</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Union</span>
<span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span>
<span class="kn">from</span> <span class="nn">apache_beam.pvalue</span> <span class="kn">import</span> <span class="n">PValue</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_environment</span> <span class="k">as</span> <span class="n">ie</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.background_caching_job</span> <span class="kn">import</span> <span class="n">has_source_to_cache</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.caching.cacheable</span> <span class="kn">import</span> <span class="n">CacheKey</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.caching.reify</span> <span class="kn">import</span> <span class="n">reify_to_cache</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.caching.reify</span> <span class="kn">import</span> <span class="n">unreify_from_cache</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.display.pcoll_visualization</span> <span class="kn">import</span> <span class="n">visualize_computed_pcoll</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.sql.sql_chain</span> <span class="kn">import</span> <span class="n">SqlChain</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.sql.sql_chain</span> <span class="kn">import</span> <span class="n">SqlNode</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.sql.utils</span> <span class="kn">import</span> <span class="n">DataflowOptionsForm</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.sql.utils</span> <span class="kn">import</span> <span class="n">find_pcolls</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.sql.utils</span> <span class="kn">import</span> <span class="n">pformat_namedtuple</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.sql.utils</span> <span class="kn">import</span> <span class="n">register_coder_for_schema</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.sql.utils</span> <span class="kn">import</span> <span class="n">replace_single_pcoll_token</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.utils</span> <span class="kn">import</span> <span class="n">create_var_in_main</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.utils</span> <span class="kn">import</span> <span class="n">obfuscate</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.utils</span> <span class="kn">import</span> <span class="n">pcoll_by_name</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive.utils</span> <span class="kn">import</span> <span class="n">progress_indicated</span>
<span class="kn">from</span> <span class="nn">apache_beam.testing</span> <span class="kn">import</span> <span class="n">test_stream</span>
<span class="kn">from</span> <span class="nn">apache_beam.testing.test_stream_service</span> <span class="kn">import</span> <span class="n">TestStreamServiceController</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.sql</span> <span class="kn">import</span> <span class="n">SqlTransform</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints.native_type_compatibility</span> <span class="kn">import</span> <span class="n">match_is_named_tuple</span>
<span class="kn">from</span> <span class="nn">IPython.core.magic</span> <span class="kn">import</span> <span class="n">Magics</span>
<span class="kn">from</span> <span class="nn">IPython.core.magic</span> <span class="kn">import</span> <span class="n">line_cell_magic</span>
<span class="kn">from</span> <span class="nn">IPython.core.magic</span> <span class="kn">import</span> <span class="n">magics_class</span>
<span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>
<span class="n">_EXAMPLE_USAGE</span> <span class="o">=</span> <span class="s2">&quot;&quot;&quot;beam_sql magic to execute Beam SQL in notebooks</span>
<span class="s2">---------------------------------------------------------</span>
<span class="si">%%</span><span class="s2">beam_sql [-o OUTPUT_NAME] [-v] [-r RUNNER] query</span>
<span class="s2">---------------------------------------------------------</span>
<span class="s2">Or</span>
<span class="s2">---------------------------------------------------------</span>
<span class="si">%%%%</span><span class="s2">beam_sql [-o OUTPUT_NAME] [-v] [-r RUNNER] query-line#1</span>
<span class="s2">query-line#2</span>
<span class="s2">...</span>
<span class="s2">query-line#N</span>
<span class="s2">---------------------------------------------------------</span>
<span class="s2">&quot;&quot;&quot;</span>
<span class="n">_NOT_SUPPORTED_MSG</span> <span class="o">=</span> <span class="s2">&quot;&quot;&quot;The query was valid and successfully applied.</span>
<span class="s2"> But beam_sql failed to execute the query: </span><span class="si">%s</span>
<span class="s2"> Runner used by beam_sql was </span><span class="si">%s</span><span class="s2">.</span>
<span class="s2"> Some Beam features might have not been supported by the Python SDK and runner combination.</span>
<span class="s2"> Please check the runner output for more details about the failed items.</span>
<span class="s2"> In the meantime, you may check:</span>
<span class="s2"> https://beam.apache.org/documentation/runners/capability-matrix/</span>
<span class="s2"> to choose a runner other than the InteractiveRunner and explicitly apply SqlTransform</span>
<span class="s2"> to build Beam pipelines in a non-interactive manner.</span>
<span class="s2">&quot;&quot;&quot;</span>
<span class="n">_SUPPORTED_RUNNERS</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;DirectRunner&#39;</span><span class="p">,</span> <span class="s1">&#39;DataflowRunner&#39;</span><span class="p">]</span>
<div class="viewcode-block" id="BeamSqlParser"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.sql.beam_sql_magics.html#apache_beam.runners.interactive.sql.beam_sql_magics.BeamSqlParser">[docs]</a><span class="k">class</span> <span class="nc">BeamSqlParser</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A parser to parse beam_sql inputs.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_parser</span> <span class="o">=</span> <span class="n">argparse</span><span class="o">.</span><span class="n">ArgumentParser</span><span class="p">(</span><span class="n">usage</span><span class="o">=</span><span class="n">_EXAMPLE_USAGE</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;-o&#39;</span><span class="p">,</span>
<span class="s1">&#39;--output-name&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;output_name&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;The output variable name of the magic, usually a PCollection. &#39;</span>
<span class="s1">&#39;Auto-generated if omitted.&#39;</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;-v&#39;</span><span class="p">,</span>
<span class="s1">&#39;--verbose&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Display more details about the magic execution.&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;-r&#39;</span><span class="p">,</span>
<span class="s1">&#39;--runner&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;runner&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;The runner to run the query. Supported runners are </span><span class="si">%s</span><span class="s1">. If not &#39;</span>
<span class="s1">&#39;provided, DirectRunner is used and results can be inspected &#39;</span>
<span class="s1">&#39;locally.&#39;</span> <span class="o">%</span> <span class="n">_SUPPORTED_RUNNERS</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;query&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">,</span>
<span class="n">nargs</span><span class="o">=</span><span class="s1">&#39;*&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;The Beam SQL query to execute. &#39;</span>
<span class="s1">&#39;Syntax: https://beam.apache.org/documentation/dsls/sql/calcite/&#39;</span>
<span class="s1">&#39;query-syntax/. &#39;</span>
<span class="s1">&#39;Please make sure that there is no conflict between your variable &#39;</span>
<span class="s1">&#39;names and the SQL keywords, such as &quot;SELECT&quot;, &quot;FROM&quot;, &quot;WHERE&quot; and &#39;</span>
<span class="s1">&#39;etc.&#39;</span><span class="p">))</span>
<div class="viewcode-block" id="BeamSqlParser.parse"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.sql.beam_sql_magics.html#apache_beam.runners.interactive.sql.beam_sql_magics.BeamSqlParser.parse">[docs]</a> <span class="k">def</span> <span class="nf">parse</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">args</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="n">argparse</span><span class="o">.</span><span class="n">Namespace</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Parses a list of string inputs.</span>
<span class="sd"> The parsed namespace contains these attributes:</span>
<span class="sd"> output_name: Optional[str], the output variable name.</span>
<span class="sd"> verbose: bool, whether to display more details of the magic execution.</span>
<span class="sd"> query: Optional[List[str]], the beam SQL query to execute.</span>
<span class="sd"> Returns:</span>
<span class="sd"> The parsed args or None if fail to parse.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">(</span><span class="n">args</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span>
<span class="k">raise</span>
<span class="k">except</span><span class="p">:</span> <span class="c1"># pylint: disable=bare-except</span>
<span class="c1"># -h or --help results in SystemExit 0. Do not raise.</span>
<span class="k">return</span> <span class="kc">None</span></div>
<div class="viewcode-block" id="BeamSqlParser.print_help"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.sql.beam_sql_magics.html#apache_beam.runners.interactive.sql.beam_sql_magics.BeamSqlParser.print_help">[docs]</a> <span class="k">def</span> <span class="nf">print_help</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_parser</span><span class="o">.</span><span class="n">print_help</span><span class="p">()</span></div></div>
<div class="viewcode-block" id="on_error"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.sql.beam_sql_magics.html#apache_beam.runners.interactive.sql.beam_sql_magics.on_error">[docs]</a><span class="k">def</span> <span class="nf">on_error</span><span class="p">(</span><span class="n">error_msg</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Logs the error and the usage example.&quot;&quot;&quot;</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="n">error_msg</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">)</span>
<span class="n">BeamSqlParser</span><span class="p">()</span><span class="o">.</span><span class="n">print_help</span><span class="p">()</span></div>
<div class="viewcode-block" id="BeamSqlMagics"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.sql.beam_sql_magics.html#apache_beam.runners.interactive.sql.beam_sql_magics.BeamSqlMagics">[docs]</a><span class="nd">@magics_class</span>
<span class="k">class</span> <span class="nc">BeamSqlMagics</span><span class="p">(</span><span class="n">Magics</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">shell</span><span class="p">):</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">shell</span><span class="p">)</span>
<span class="c1"># Eagerly initializes the environment.</span>
<span class="n">_</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_parser</span> <span class="o">=</span> <span class="n">BeamSqlParser</span><span class="p">()</span>
<div class="viewcode-block" id="BeamSqlMagics.beam_sql"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.sql.beam_sql_magics.html#apache_beam.runners.interactive.sql.beam_sql_magics.BeamSqlMagics.beam_sql">[docs]</a> <span class="nd">@line_cell_magic</span>
<span class="k">def</span> <span class="nf">beam_sql</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">line</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">cell</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="n">PValue</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;The beam_sql line/cell magic that executes a Beam SQL.</span>
<span class="sd"> Args:</span>
<span class="sd"> line: the string on the same line after the beam_sql magic.</span>
<span class="sd"> cell: everything else in the same notebook cell as a string. If None,</span>
<span class="sd"> beam_sql is used as line magic. Otherwise, cell magic.</span>
<span class="sd"> Returns None if running into an error or waiting for user input (running on</span>
<span class="sd"> a selected runner remotely), otherwise a PValue as if a SqlTransform is</span>
<span class="sd"> applied.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">input_str</span> <span class="o">=</span> <span class="n">line</span>
<span class="k">if</span> <span class="n">cell</span><span class="p">:</span>
<span class="n">input_str</span> <span class="o">+=</span> <span class="s1">&#39; &#39;</span> <span class="o">+</span> <span class="n">cell</span>
<span class="n">parsed</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_parser</span><span class="o">.</span><span class="n">parse</span><span class="p">(</span><span class="n">input_str</span><span class="o">.</span><span class="n">strip</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">())</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">parsed</span><span class="p">:</span>
<span class="c1"># Failed to parse inputs, let the parser handle the exit.</span>
<span class="k">return</span>
<span class="n">output_name</span> <span class="o">=</span> <span class="n">parsed</span><span class="o">.</span><span class="n">output_name</span>
<span class="n">verbose</span> <span class="o">=</span> <span class="n">parsed</span><span class="o">.</span><span class="n">verbose</span>
<span class="n">query</span> <span class="o">=</span> <span class="n">parsed</span><span class="o">.</span><span class="n">query</span>
<span class="n">runner</span> <span class="o">=</span> <span class="n">parsed</span><span class="o">.</span><span class="n">runner</span>
<span class="k">if</span> <span class="n">output_name</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">output_name</span><span class="o">.</span><span class="n">isidentifier</span><span class="p">()</span> <span class="ow">or</span> <span class="n">keyword</span><span class="o">.</span><span class="n">iskeyword</span><span class="p">(</span>
<span class="n">output_name</span><span class="p">):</span>
<span class="n">on_error</span><span class="p">(</span>
<span class="s1">&#39;The output_name &quot;</span><span class="si">%s</span><span class="s1">&quot; is not a valid identifier. Please supply a &#39;</span>
<span class="s1">&#39;valid identifier that is not a Python keyword.&#39;</span><span class="p">,</span>
<span class="n">line</span><span class="p">)</span>
<span class="k">return</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">query</span><span class="p">:</span>
<span class="n">on_error</span><span class="p">(</span><span class="s1">&#39;Please supply the SQL query to be executed.&#39;</span><span class="p">)</span>
<span class="k">return</span>
<span class="k">if</span> <span class="n">runner</span> <span class="ow">and</span> <span class="n">runner</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">_SUPPORTED_RUNNERS</span><span class="p">:</span>
<span class="n">on_error</span><span class="p">(</span>
<span class="s1">&#39;Runner &quot;</span><span class="si">%s</span><span class="s1">&quot; is not supported. Supported runners are </span><span class="si">%s</span><span class="s1">.&#39;</span><span class="p">,</span>
<span class="n">runner</span><span class="p">,</span>
<span class="n">_SUPPORTED_RUNNERS</span><span class="p">)</span>
<span class="k">return</span>
<span class="n">query</span> <span class="o">=</span> <span class="s1">&#39; &#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">query</span><span class="p">)</span>
<span class="n">found</span> <span class="o">=</span> <span class="n">find_pcolls</span><span class="p">(</span><span class="n">query</span><span class="p">,</span> <span class="n">pcoll_by_name</span><span class="p">(),</span> <span class="n">verbose</span><span class="o">=</span><span class="n">verbose</span><span class="p">)</span>
<span class="n">schemas</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="n">main_session</span> <span class="o">=</span> <span class="n">importlib</span><span class="o">.</span><span class="n">import_module</span><span class="p">(</span><span class="s1">&#39;__main__&#39;</span><span class="p">)</span>
<span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">found</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">match_is_named_tuple</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">):</span>
<span class="n">on_error</span><span class="p">(</span>
<span class="s1">&#39;PCollection </span><span class="si">%s</span><span class="s1"> of type </span><span class="si">%s</span><span class="s1"> is not a NamedTuple. See &#39;</span>
<span class="s1">&#39;https://beam.apache.org/documentation/programming-guide/#schemas &#39;</span>
<span class="s1">&#39;for more details.&#39;</span><span class="p">,</span>
<span class="n">pcoll</span><span class="p">,</span>
<span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">)</span>
<span class="k">return</span>
<span class="n">register_coder_for_schema</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">,</span> <span class="n">verbose</span><span class="o">=</span><span class="n">verbose</span><span class="p">)</span>
<span class="c1"># Only care about schemas defined by the user in the main module.</span>
<span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">main_session</span><span class="p">,</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="o">.</span><span class="vm">__name__</span><span class="p">):</span>
<span class="n">schemas</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">)</span>
<span class="k">if</span> <span class="n">runner</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">&#39;DirectRunner&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">):</span>
<span class="n">collect_data_for_local_run</span><span class="p">(</span><span class="n">query</span><span class="p">,</span> <span class="n">found</span><span class="p">)</span>
<span class="n">output_name</span><span class="p">,</span> <span class="n">output</span><span class="p">,</span> <span class="n">chain</span> <span class="o">=</span> <span class="n">apply_sql</span><span class="p">(</span><span class="n">query</span><span class="p">,</span> <span class="n">output_name</span><span class="p">,</span> <span class="n">found</span><span class="p">)</span>
<span class="n">chain</span><span class="o">.</span><span class="n">current</span><span class="o">.</span><span class="n">schemas</span> <span class="o">=</span> <span class="n">schemas</span>
<span class="n">cache_output</span><span class="p">(</span><span class="n">output_name</span><span class="p">,</span> <span class="n">output</span><span class="p">)</span>
<span class="k">return</span> <span class="n">output</span>
<span class="n">output_name</span><span class="p">,</span> <span class="n">current_node</span><span class="p">,</span> <span class="n">chain</span> <span class="o">=</span> <span class="n">apply_sql</span><span class="p">(</span>
<span class="n">query</span><span class="p">,</span> <span class="n">output_name</span><span class="p">,</span> <span class="n">found</span><span class="p">,</span> <span class="kc">False</span><span class="p">)</span>
<span class="n">current_node</span><span class="o">.</span><span class="n">schemas</span> <span class="o">=</span> <span class="n">schemas</span>
<span class="c1"># TODO(BEAM-10708): Move the options setup and result handling to a</span>
<span class="c1"># separate module when more runners are supported.</span>
<span class="k">if</span> <span class="n">runner</span> <span class="o">==</span> <span class="s1">&#39;DataflowRunner&#39;</span><span class="p">:</span>
<span class="n">_</span> <span class="o">=</span> <span class="n">chain</span><span class="o">.</span><span class="n">to_pipeline</span><span class="p">()</span>
<span class="n">_</span> <span class="o">=</span> <span class="n">DataflowOptionsForm</span><span class="p">(</span>
<span class="n">output_name</span><span class="p">,</span> <span class="n">pcoll_by_name</span><span class="p">()[</span><span class="n">output_name</span><span class="p">],</span>
<span class="n">verbose</span><span class="p">)</span><span class="o">.</span><span class="n">display_for_input</span><span class="p">()</span>
<span class="k">return</span> <span class="kc">None</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Unsupported runner </span><span class="si">%s</span><span class="s1">.&#39;</span><span class="p">,</span> <span class="n">runner</span><span class="p">)</span></div></div>
<div class="viewcode-block" id="collect_data_for_local_run"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.sql.beam_sql_magics.html#apache_beam.runners.interactive.sql.beam_sql_magics.collect_data_for_local_run">[docs]</a><span class="nd">@progress_indicated</span>
<span class="k">def</span> <span class="nf">collect_data_for_local_run</span><span class="p">(</span><span class="n">query</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">found</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">]):</span>
<span class="kn">from</span> <span class="nn">apache_beam.runners.interactive</span> <span class="kn">import</span> <span class="n">interactive_beam</span> <span class="k">as</span> <span class="n">ib</span>
<span class="k">for</span> <span class="n">name</span><span class="p">,</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">found</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">_</span> <span class="o">=</span> <span class="n">ib</span><span class="o">.</span><span class="n">collect</span><span class="p">(</span><span class="n">pcoll</span><span class="p">)</span>
<span class="k">except</span> <span class="p">(</span><span class="ne">KeyboardInterrupt</span><span class="p">,</span> <span class="ne">SystemExit</span><span class="p">):</span>
<span class="k">raise</span>
<span class="k">except</span><span class="p">:</span> <span class="c1"># pylint: disable=bare-except</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">error</span><span class="p">(</span>
<span class="s1">&#39;Cannot collect data for PCollection </span><span class="si">%s</span><span class="s1">. Please make sure the &#39;</span>
<span class="s1">&#39;PCollections queried in the sql &quot;</span><span class="si">%s</span><span class="s1">&quot; are all from a single &#39;</span>
<span class="s1">&#39;pipeline using an InteractiveRunner. Make sure there is no &#39;</span>
<span class="s1">&#39;ambiguity, for example, same named PCollections from multiple &#39;</span>
<span class="s1">&#39;pipelines or notebook re-executions.&#39;</span><span class="p">,</span>
<span class="n">name</span><span class="p">,</span>
<span class="n">query</span><span class="p">)</span>
<span class="k">raise</span></div>
<div class="viewcode-block" id="apply_sql"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.sql.beam_sql_magics.html#apache_beam.runners.interactive.sql.beam_sql_magics.apply_sql">[docs]</a><span class="nd">@progress_indicated</span>
<span class="k">def</span> <span class="nf">apply_sql</span><span class="p">(</span>
<span class="n">query</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">output_name</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">],</span>
<span class="n">found</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">],</span>
<span class="n">run</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">True</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Union</span><span class="p">[</span><span class="n">PValue</span><span class="p">,</span> <span class="n">SqlNode</span><span class="p">],</span> <span class="n">SqlChain</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Applies a SqlTransform with the given sql and queried PCollections.</span>
<span class="sd"> Args:</span>
<span class="sd"> query: The SQL query executed in the magic.</span>
<span class="sd"> output_name: (optional) The output variable name in __main__ module.</span>
<span class="sd"> found: The PCollections with variable names found to be used in the query.</span>
<span class="sd"> run: Whether to prepare the SQL pipeline for a local run or not.</span>
<span class="sd"> Returns:</span>
<span class="sd"> A tuple of values. First str value is the output variable name in</span>
<span class="sd"> __main__ module, auto-generated if not provided. Second value: if run,</span>
<span class="sd"> it&#39;s a PValue; otherwise, a SqlNode tracks the SQL without applying it or</span>
<span class="sd"> executing it. Third value: SqlChain is a chain of SqlNodes that have been</span>
<span class="sd"> applied.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">output_name</span> <span class="o">=</span> <span class="n">_generate_output_name</span><span class="p">(</span><span class="n">output_name</span><span class="p">,</span> <span class="n">query</span><span class="p">,</span> <span class="n">found</span><span class="p">)</span>
<span class="n">query</span><span class="p">,</span> <span class="n">sql_source</span><span class="p">,</span> <span class="n">chain</span> <span class="o">=</span> <span class="n">_build_query_components</span><span class="p">(</span>
<span class="n">query</span><span class="p">,</span> <span class="n">found</span><span class="p">,</span> <span class="n">output_name</span><span class="p">,</span> <span class="n">run</span><span class="p">)</span>
<span class="k">if</span> <span class="n">run</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">output</span> <span class="o">=</span> <span class="n">sql_source</span> <span class="o">|</span> <span class="n">SqlTransform</span><span class="p">(</span><span class="n">query</span><span class="p">)</span>
<span class="c1"># Declare a variable with the output_name and output value in the</span>
<span class="c1"># __main__ module so that the user can use the output smoothly.</span>
<span class="n">output_name</span><span class="p">,</span> <span class="n">output</span> <span class="o">=</span> <span class="n">create_var_in_main</span><span class="p">(</span><span class="n">output_name</span><span class="p">,</span> <span class="n">output</span><span class="p">)</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s2">&quot;The output PCollection variable is </span><span class="si">%s</span><span class="s2"> with element_type </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span>
<span class="n">output_name</span><span class="p">,</span>
<span class="n">pformat_namedtuple</span><span class="p">(</span><span class="n">output</span><span class="o">.</span><span class="n">element_type</span><span class="p">))</span>
<span class="k">return</span> <span class="n">output_name</span><span class="p">,</span> <span class="n">output</span><span class="p">,</span> <span class="n">chain</span>
<span class="k">except</span> <span class="p">(</span><span class="ne">KeyboardInterrupt</span><span class="p">,</span> <span class="ne">SystemExit</span><span class="p">):</span>
<span class="k">raise</span>
<span class="k">except</span><span class="p">:</span> <span class="c1"># pylint: disable=bare-except</span>
<span class="n">on_error</span><span class="p">(</span><span class="s1">&#39;Error when applying the Beam SQL: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">traceback</span><span class="o">.</span><span class="n">format_exc</span><span class="p">())</span>
<span class="k">raise</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">output_name</span><span class="p">,</span> <span class="n">chain</span><span class="o">.</span><span class="n">current</span><span class="p">,</span> <span class="n">chain</span></div>
<div class="viewcode-block" id="pcolls_from_streaming_cache"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.sql.beam_sql_magics.html#apache_beam.runners.interactive.sql.beam_sql_magics.pcolls_from_streaming_cache">[docs]</a><span class="k">def</span> <span class="nf">pcolls_from_streaming_cache</span><span class="p">(</span>
<span class="n">user_pipeline</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">,</span>
<span class="n">query_pipeline</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">,</span>
<span class="n">name_to_pcoll</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Reads PCollection cache through the TestStream.</span>
<span class="sd"> Args:</span>
<span class="sd"> user_pipeline: The beam.Pipeline object defined by the user in the</span>
<span class="sd"> notebook.</span>
<span class="sd"> query_pipeline: The beam.Pipeline object built by the magic to execute the</span>
<span class="sd"> SQL query.</span>
<span class="sd"> name_to_pcoll: PCollections with variable names used in the SQL query.</span>
<span class="sd"> Returns:</span>
<span class="sd"> A Dict[str, beam.PCollection], where each PCollection is tagged with</span>
<span class="sd"> their PCollection variable names, read from the cache.</span>
<span class="sd"> When the user_pipeline has unbounded sources, we force all cache reads to go</span>
<span class="sd"> through the TestStream even if they are bounded sources.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">exception_handler</span><span class="p">(</span><span class="n">e</span><span class="p">):</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">))</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="n">cache_manager</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">get_cache_manager</span><span class="p">(</span>
<span class="n">user_pipeline</span><span class="p">,</span> <span class="n">create_if_absent</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="n">test_stream_service</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">get_test_stream_service_controller</span><span class="p">(</span>
<span class="n">user_pipeline</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">test_stream_service</span><span class="p">:</span>
<span class="n">test_stream_service</span> <span class="o">=</span> <span class="n">TestStreamServiceController</span><span class="p">(</span>
<span class="n">cache_manager</span><span class="p">,</span> <span class="n">exception_handler</span><span class="o">=</span><span class="n">exception_handler</span><span class="p">)</span>
<span class="n">test_stream_service</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
<span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">set_test_stream_service_controller</span><span class="p">(</span>
<span class="n">user_pipeline</span><span class="p">,</span> <span class="n">test_stream_service</span><span class="p">)</span>
<span class="n">tag_to_name</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">for</span> <span class="n">name</span><span class="p">,</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">name_to_pcoll</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="n">key</span> <span class="o">=</span> <span class="n">CacheKey</span><span class="o">.</span><span class="n">from_pcoll</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">)</span><span class="o">.</span><span class="n">to_str</span><span class="p">()</span>
<span class="n">tag_to_name</span><span class="p">[</span><span class="n">key</span><span class="p">]</span> <span class="o">=</span> <span class="n">name</span>
<span class="n">output_pcolls</span> <span class="o">=</span> <span class="n">query_pipeline</span> <span class="o">|</span> <span class="n">test_stream</span><span class="o">.</span><span class="n">TestStream</span><span class="p">(</span>
<span class="n">output_tags</span><span class="o">=</span><span class="nb">set</span><span class="p">(</span><span class="n">tag_to_name</span><span class="o">.</span><span class="n">keys</span><span class="p">()),</span>
<span class="n">coder</span><span class="o">=</span><span class="n">cache_manager</span><span class="o">.</span><span class="n">_default_pcoder</span><span class="p">,</span>
<span class="n">endpoint</span><span class="o">=</span><span class="n">test_stream_service</span><span class="o">.</span><span class="n">endpoint</span><span class="p">)</span>
<span class="n">sql_source</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">output</span> <span class="ow">in</span> <span class="n">output_pcolls</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="n">name</span> <span class="o">=</span> <span class="n">tag_to_name</span><span class="p">[</span><span class="n">tag</span><span class="p">]</span>
<span class="c1"># Must mark the element_type to avoid introducing pickled Python coder</span>
<span class="c1"># to the Java expansion service.</span>
<span class="n">output</span><span class="o">.</span><span class="n">element_type</span> <span class="o">=</span> <span class="n">name_to_pcoll</span><span class="p">[</span><span class="n">name</span><span class="p">]</span><span class="o">.</span><span class="n">element_type</span>
<span class="n">sql_source</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">output</span>
<span class="k">return</span> <span class="n">sql_source</span></div>
<span class="k">def</span> <span class="nf">_generate_output_name</span><span class="p">(</span>
<span class="n">output_name</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">],</span> <span class="n">query</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">found</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="nb">str</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Generates a unique output name if None is provided.</span>
<span class="sd"> Otherwise, returns the given output name directly.</span>
<span class="sd"> The generated output name is sql_output_{uuid} where uuid is an obfuscated</span>
<span class="sd"> value from the query and PCollections found to be used in the query.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">output_name</span><span class="p">:</span>
<span class="n">execution_id</span> <span class="o">=</span> <span class="n">obfuscate</span><span class="p">(</span><span class="n">query</span><span class="p">,</span> <span class="n">found</span><span class="p">)[:</span><span class="mi">12</span><span class="p">]</span>
<span class="n">output_name</span> <span class="o">=</span> <span class="s1">&#39;sql_output_&#39;</span> <span class="o">+</span> <span class="n">execution_id</span>
<span class="k">return</span> <span class="n">output_name</span>
<span class="k">def</span> <span class="nf">_build_query_components</span><span class="p">(</span>
<span class="n">query</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">found</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">],</span>
<span class="n">output_name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">run</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">True</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span>
<span class="n">Union</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">],</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">],</span>
<span class="n">SqlChain</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Builds necessary components needed to apply the SqlTransform.</span>
<span class="sd"> Args:</span>
<span class="sd"> query: The SQL query to be executed by the magic.</span>
<span class="sd"> found: The PCollections with variable names found to be used by the query.</span>
<span class="sd"> output_name: The output variable name in __main__ module.</span>
<span class="sd"> run: Whether to prepare components for a local run or not.</span>
<span class="sd"> Returns:</span>
<span class="sd"> The processed query to be executed by the magic; a source to apply the</span>
<span class="sd"> SqlTransform to: a dictionary of tagged PCollections, or a single</span>
<span class="sd"> PCollection, or the pipeline to execute the query; the chain of applied</span>
<span class="sd"> beam_sql magics this one belongs to.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">found</span><span class="p">:</span>
<span class="n">user_pipeline</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">user_pipeline</span><span class="p">(</span>
<span class="nb">next</span><span class="p">(</span><span class="nb">iter</span><span class="p">(</span><span class="n">found</span><span class="o">.</span><span class="n">values</span><span class="p">()))</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span>
<span class="n">sql_pipeline</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">(</span><span class="n">options</span><span class="o">=</span><span class="n">user_pipeline</span><span class="o">.</span><span class="n">_options</span><span class="p">)</span>
<span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">add_derived_pipeline</span><span class="p">(</span><span class="n">user_pipeline</span><span class="p">,</span> <span class="n">sql_pipeline</span><span class="p">)</span>
<span class="n">sql_source</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">if</span> <span class="n">run</span><span class="p">:</span>
<span class="k">if</span> <span class="n">has_source_to_cache</span><span class="p">(</span><span class="n">user_pipeline</span><span class="p">):</span>
<span class="n">sql_source</span> <span class="o">=</span> <span class="n">pcolls_from_streaming_cache</span><span class="p">(</span>
<span class="n">user_pipeline</span><span class="p">,</span> <span class="n">sql_pipeline</span><span class="p">,</span> <span class="n">found</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">cache_manager</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">get_cache_manager</span><span class="p">(</span>
<span class="n">user_pipeline</span><span class="p">,</span> <span class="n">create_if_absent</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="k">for</span> <span class="n">pcoll_name</span><span class="p">,</span> <span class="n">pcoll</span> <span class="ow">in</span> <span class="n">found</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="n">cache_key</span> <span class="o">=</span> <span class="n">CacheKey</span><span class="o">.</span><span class="n">from_pcoll</span><span class="p">(</span><span class="n">pcoll_name</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">)</span><span class="o">.</span><span class="n">to_str</span><span class="p">()</span>
<span class="n">sql_source</span><span class="p">[</span><span class="n">pcoll_name</span><span class="p">]</span> <span class="o">=</span> <span class="n">unreify_from_cache</span><span class="p">(</span>
<span class="n">pipeline</span><span class="o">=</span><span class="n">sql_pipeline</span><span class="p">,</span>
<span class="n">cache_key</span><span class="o">=</span><span class="n">cache_key</span><span class="p">,</span>
<span class="n">cache_manager</span><span class="o">=</span><span class="n">cache_manager</span><span class="p">,</span>
<span class="n">element_type</span><span class="o">=</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">sql_source</span> <span class="o">=</span> <span class="n">found</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">sql_source</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="n">query</span> <span class="o">=</span> <span class="n">replace_single_pcoll_token</span><span class="p">(</span><span class="n">query</span><span class="p">,</span> <span class="nb">next</span><span class="p">(</span><span class="nb">iter</span><span class="p">(</span><span class="n">sql_source</span><span class="o">.</span><span class="n">keys</span><span class="p">())))</span>
<span class="n">sql_source</span> <span class="o">=</span> <span class="nb">next</span><span class="p">(</span><span class="nb">iter</span><span class="p">(</span><span class="n">sql_source</span><span class="o">.</span><span class="n">values</span><span class="p">()))</span>
<span class="n">node</span> <span class="o">=</span> <span class="n">SqlNode</span><span class="p">(</span>
<span class="n">output_name</span><span class="o">=</span><span class="n">output_name</span><span class="p">,</span> <span class="n">source</span><span class="o">=</span><span class="nb">set</span><span class="p">(</span><span class="n">found</span><span class="o">.</span><span class="n">keys</span><span class="p">()),</span> <span class="n">query</span><span class="o">=</span><span class="n">query</span><span class="p">)</span>
<span class="n">chain</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">get_sql_chain</span><span class="p">(</span>
<span class="n">user_pipeline</span><span class="p">,</span> <span class="n">set_user_pipeline</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">node</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span> <span class="c1"># does not query any existing PCollection</span>
<span class="n">sql_source</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">()</span>
<span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">add_user_pipeline</span><span class="p">(</span><span class="n">sql_source</span><span class="p">)</span>
<span class="c1"># The node should be the root node of the chain created below.</span>
<span class="n">node</span> <span class="o">=</span> <span class="n">SqlNode</span><span class="p">(</span><span class="n">output_name</span><span class="o">=</span><span class="n">output_name</span><span class="p">,</span> <span class="n">source</span><span class="o">=</span><span class="n">sql_source</span><span class="p">,</span> <span class="n">query</span><span class="o">=</span><span class="n">query</span><span class="p">)</span>
<span class="n">chain</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">get_sql_chain</span><span class="p">(</span><span class="n">sql_source</span><span class="p">)</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">node</span><span class="p">)</span>
<span class="k">return</span> <span class="n">query</span><span class="p">,</span> <span class="n">sql_source</span><span class="p">,</span> <span class="n">chain</span>
<div class="viewcode-block" id="cache_output"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.sql.beam_sql_magics.html#apache_beam.runners.interactive.sql.beam_sql_magics.cache_output">[docs]</a><span class="nd">@progress_indicated</span>
<span class="k">def</span> <span class="nf">cache_output</span><span class="p">(</span><span class="n">output_name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">output</span><span class="p">:</span> <span class="n">PValue</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">user_pipeline</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">user_pipeline</span><span class="p">(</span><span class="n">output</span><span class="o">.</span><span class="n">pipeline</span><span class="p">)</span>
<span class="k">if</span> <span class="n">user_pipeline</span><span class="p">:</span>
<span class="n">cache_manager</span> <span class="o">=</span> <span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">get_cache_manager</span><span class="p">(</span>
<span class="n">user_pipeline</span><span class="p">,</span> <span class="n">create_if_absent</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;Something is wrong with </span><span class="si">%s</span><span class="s1">. Cannot introspect its data.&#39;</span><span class="p">,</span> <span class="n">output</span><span class="p">)</span>
<span class="k">return</span>
<span class="n">key</span> <span class="o">=</span> <span class="n">CacheKey</span><span class="o">.</span><span class="n">from_pcoll</span><span class="p">(</span><span class="n">output_name</span><span class="p">,</span> <span class="n">output</span><span class="p">)</span><span class="o">.</span><span class="n">to_str</span><span class="p">()</span>
<span class="n">_</span> <span class="o">=</span> <span class="n">reify_to_cache</span><span class="p">(</span><span class="n">pcoll</span><span class="o">=</span><span class="n">output</span><span class="p">,</span> <span class="n">cache_key</span><span class="o">=</span><span class="n">key</span><span class="p">,</span> <span class="n">cache_manager</span><span class="o">=</span><span class="n">cache_manager</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">output</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">run</span><span class="p">()</span><span class="o">.</span><span class="n">wait_until_finish</span><span class="p">()</span>
<span class="k">except</span> <span class="p">(</span><span class="ne">KeyboardInterrupt</span><span class="p">,</span> <span class="ne">SystemExit</span><span class="p">):</span>
<span class="k">raise</span>
<span class="k">except</span><span class="p">:</span> <span class="c1"># pylint: disable=bare-except</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="n">_NOT_SUPPORTED_MSG</span><span class="p">,</span> <span class="n">traceback</span><span class="o">.</span><span class="n">format_exc</span><span class="p">(),</span> <span class="n">output</span><span class="o">.</span><span class="n">pipeline</span><span class="o">.</span><span class="n">runner</span><span class="p">)</span>
<span class="k">return</span>
<span class="n">ie</span><span class="o">.</span><span class="n">current_env</span><span class="p">()</span><span class="o">.</span><span class="n">mark_pcollection_computed</span><span class="p">([</span><span class="n">output</span><span class="p">])</span>
<span class="n">visualize_computed_pcoll</span><span class="p">(</span>
<span class="n">output_name</span><span class="p">,</span> <span class="n">output</span><span class="p">,</span> <span class="n">max_n</span><span class="o">=</span><span class="nb">float</span><span class="p">(</span><span class="s1">&#39;inf&#39;</span><span class="p">),</span> <span class="n">max_duration_secs</span><span class="o">=</span><span class="nb">float</span><span class="p">(</span><span class="s1">&#39;inf&#39;</span><span class="p">))</span></div>
<div class="viewcode-block" id="load_ipython_extension"><a class="viewcode-back" href="../../../../../apache_beam.runners.interactive.sql.beam_sql_magics.html#apache_beam.runners.interactive.sql.beam_sql_magics.load_ipython_extension">[docs]</a><span class="k">def</span> <span class="nf">load_ipython_extension</span><span class="p">(</span><span class="n">ipython</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Marks this module as an IPython extension.</span>
<span class="sd"> To load this magic in an IPython environment, execute:</span>
<span class="sd"> %load_ext apache_beam.runners.interactive.sql.beam_sql_magics.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">ipython</span><span class="o">.</span><span class="n">register_magics</span><span class="p">(</span><span class="n">BeamSqlMagics</span><span class="p">)</span></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>