| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>apache_beam.yaml.yaml_mapping — Apache Beam 2.55.1 documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../_static/language_data.js"></script> |
| <script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| |
| <script type="text/javascript" src="../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../search.html" /> |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../index.html" class="icon icon-home"> Apache Beam |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 2.55.1 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../index.html">Apache Beam</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../index.html">Module code</a> »</li> |
| |
| <li>apache_beam.yaml.yaml_mapping</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.yaml.yaml_mapping</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="sd">"""This module defines the basic MapToFields operation."""</span> |
| <span class="kn">import</span> <span class="nn">functools</span> |
| <span class="kn">import</span> <span class="nn">inspect</span> |
| <span class="kn">import</span> <span class="nn">itertools</span> |
| <span class="kn">from</span> <span class="nn">collections</span> <span class="kn">import</span> <span class="n">abc</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Any</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Callable</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Collection</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Dict</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Mapping</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">NamedTuple</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Optional</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">TypeVar</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Union</span> |
| |
| <span class="kn">import</span> <span class="nn">js2py</span> |
| <span class="kn">from</span> <span class="nn">js2py</span> <span class="kn">import</span> <span class="n">base</span> |
| <span class="kn">from</span> <span class="nn">js2py.constructors</span> <span class="kn">import</span> <span class="n">jsdate</span> |
| <span class="kn">from</span> <span class="nn">js2py.internals</span> <span class="kn">import</span> <span class="n">simplex</span> |
| |
| <span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.filesystems</span> <span class="kn">import</span> <span class="n">FileSystems</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.portability.api</span> <span class="kn">import</span> <span class="n">schema_pb2</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">TimestampedValue</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="kn">import</span> <span class="n">row_type</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="kn">import</span> <span class="n">schemas</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="kn">import</span> <span class="n">trivial_inference</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.typehints.row_type</span> <span class="kn">import</span> <span class="n">RowTypeConstraint</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.typehints.schemas</span> <span class="kn">import</span> <span class="n">named_fields_from_element_type</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.utils</span> <span class="kn">import</span> <span class="n">python_callable</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.yaml</span> <span class="kn">import</span> <span class="n">json_utils</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.yaml</span> <span class="kn">import</span> <span class="n">options</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.yaml</span> <span class="kn">import</span> <span class="n">yaml_provider</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.yaml.yaml_provider</span> <span class="kn">import</span> <span class="n">dicts_to_rows</span> |
| |
| |
| <div class="viewcode-block" id="normalize_mapping"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.normalize_mapping">[docs]</a><span class="k">def</span> <span class="nf">normalize_mapping</span><span class="p">(</span><span class="n">spec</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Normalizes various fields for mapping transforms.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">spec</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'MapToFields'</span><span class="p">:</span> |
| <span class="n">config</span> <span class="o">=</span> <span class="n">spec</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'config'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">config</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'drop'</span><span class="p">),</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="n">config</span><span class="p">[</span><span class="s1">'drop'</span><span class="p">]</span> <span class="o">=</span> <span class="p">[</span><span class="n">config</span><span class="p">[</span><span class="s1">'drop'</span><span class="p">]]</span> |
| <span class="k">return</span> <span class="n">spec</span></div> |
| |
| |
| <span class="k">def</span> <span class="nf">_check_mapping_arguments</span><span class="p">(</span> |
| <span class="n">transform_name</span><span class="p">,</span> <span class="n">expression</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="nb">callable</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">path</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="c1"># Argument checking</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">expression</span> <span class="ow">and</span> <span class="ow">not</span> <span class="nb">callable</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">path</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">name</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s1">'</span><span class="si">{</span><span class="n">transform_name</span><span class="si">}</span><span class="s1"> must specify either "expression", "callable", '</span> |
| <span class="sa">f</span><span class="s1">'or both "path" and "name"'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">expression</span> <span class="ow">and</span> <span class="nb">callable</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s1">'</span><span class="si">{</span><span class="n">transform_name</span><span class="si">}</span><span class="s1"> cannot specify both "expression" and "callable"'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="p">(</span><span class="n">expression</span> <span class="ow">or</span> <span class="nb">callable</span><span class="p">)</span> <span class="ow">and</span> <span class="p">(</span><span class="n">path</span> <span class="ow">or</span> <span class="n">name</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s1">'</span><span class="si">{</span><span class="n">transform_name</span><span class="si">}</span><span class="s1"> cannot specify "expression" or "callable" with '</span> |
| <span class="sa">f</span><span class="s1">'"path" or "name"'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">path</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">name</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s1">'</span><span class="si">{</span><span class="n">transform_name</span><span class="si">}</span><span class="s1"> cannot specify "path" without "name"'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">name</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">path</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s1">'</span><span class="si">{</span><span class="n">transform_name</span><span class="si">}</span><span class="s1"> cannot specify "name" without "path"'</span><span class="p">)</span> |
| |
| |
| <span class="c1"># js2py's JsObjectWrapper object has a self-referencing __dict__ property</span> |
| <span class="c1"># that cannot be pickled without implementing the __getstate__ and</span> |
| <span class="c1"># __setstate__ methods.</span> |
| <span class="k">class</span> <span class="nc">_CustomJsObjectWrapper</span><span class="p">(</span><span class="n">js2py</span><span class="o">.</span><span class="n">base</span><span class="o">.</span><span class="n">JsObjectWrapper</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">js_obj</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">js_obj</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">[</span><span class="s1">'_obj'</span><span class="p">])</span> |
| |
| <span class="k">def</span> <span class="nf">__getstate__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">__setstate__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">state</span><span class="p">)</span> |
| |
| |
| <span class="c1"># TODO(yaml) Improve type inferencing for JS UDF's</span> |
| <div class="viewcode-block" id="py_value_to_js_dict"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.py_value_to_js_dict">[docs]</a><span class="k">def</span> <span class="nf">py_value_to_js_dict</span><span class="p">(</span><span class="n">py_value</span><span class="p">):</span> |
| <span class="k">if</span> <span class="p">((</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">py_value</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">py_value</span><span class="p">,</span> <span class="s1">'_asdict'</span><span class="p">))</span> <span class="ow">or</span> |
| <span class="nb">isinstance</span><span class="p">(</span><span class="n">py_value</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">Row</span><span class="p">)):</span> |
| <span class="n">py_value</span> <span class="o">=</span> <span class="n">py_value</span><span class="o">.</span><span class="n">_asdict</span><span class="p">()</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">py_value</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span><span class="n">key</span><span class="p">:</span> <span class="n">py_value_to_js_dict</span><span class="p">(</span><span class="n">value</span><span class="p">)</span> <span class="k">for</span> <span class="n">key</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">py_value</span><span class="o">.</span><span class="n">items</span><span class="p">()}</span> |
| <span class="k">elif</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">py_value</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">py_value</span><span class="p">,</span> <span class="n">abc</span><span class="o">.</span><span class="n">Iterable</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">[</span><span class="n">py_value_to_js_dict</span><span class="p">(</span><span class="n">value</span><span class="p">)</span> <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">py_value</span><span class="p">)]</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">py_value</span></div> |
| |
| |
| <span class="c1"># TODO(yaml) Consider adding optional language version parameter to support</span> |
| <span class="c1"># ECMAScript 5 and 6</span> |
| <span class="k">def</span> <span class="nf">_expand_javascript_mapping_func</span><span class="p">(</span> |
| <span class="n">original_fields</span><span class="p">,</span> <span class="n">expression</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="nb">callable</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">path</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">name</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| |
| <span class="n">js_array_type</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">base</span><span class="o">.</span><span class="n">PyJsArray</span><span class="p">,</span> |
| <span class="n">base</span><span class="o">.</span><span class="n">PyJsArrayBuffer</span><span class="p">,</span> |
| <span class="n">base</span><span class="o">.</span><span class="n">PyJsInt8Array</span><span class="p">,</span> |
| <span class="n">base</span><span class="o">.</span><span class="n">PyJsUint8Array</span><span class="p">,</span> |
| <span class="n">base</span><span class="o">.</span><span class="n">PyJsUint8ClampedArray</span><span class="p">,</span> |
| <span class="n">base</span><span class="o">.</span><span class="n">PyJsInt16Array</span><span class="p">,</span> |
| <span class="n">base</span><span class="o">.</span><span class="n">PyJsUint16Array</span><span class="p">,</span> |
| <span class="n">base</span><span class="o">.</span><span class="n">PyJsInt32Array</span><span class="p">,</span> |
| <span class="n">base</span><span class="o">.</span><span class="n">PyJsUint32Array</span><span class="p">,</span> |
| <span class="n">base</span><span class="o">.</span><span class="n">PyJsFloat32Array</span><span class="p">,</span> |
| <span class="n">base</span><span class="o">.</span><span class="n">PyJsFloat64Array</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">_js_object_to_py_object</span><span class="p">(</span><span class="n">obj</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="p">(</span><span class="n">base</span><span class="o">.</span><span class="n">PyJsNumber</span><span class="p">,</span> <span class="n">base</span><span class="o">.</span><span class="n">PyJsString</span><span class="p">,</span> <span class="n">base</span><span class="o">.</span><span class="n">PyJsBoolean</span><span class="p">)):</span> |
| <span class="k">return</span> <span class="n">base</span><span class="o">.</span><span class="n">to_python</span><span class="p">(</span><span class="n">obj</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">js_array_type</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">[</span><span class="n">_js_object_to_py_object</span><span class="p">(</span><span class="n">value</span><span class="p">)</span> <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">obj</span><span class="o">.</span><span class="n">to_list</span><span class="p">()]</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">jsdate</span><span class="o">.</span><span class="n">PyJsDate</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">obj</span><span class="o">.</span><span class="n">to_utc_dt</span><span class="p">()</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="p">(</span><span class="n">base</span><span class="o">.</span><span class="n">PyJsNull</span><span class="p">,</span> <span class="n">base</span><span class="o">.</span><span class="n">PyJsUndefined</span><span class="p">)):</span> |
| <span class="k">return</span> <span class="kc">None</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">base</span><span class="o">.</span><span class="n">PyJsError</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span><span class="n">obj</span><span class="p">[</span><span class="s1">'message'</span><span class="p">])</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">base</span><span class="o">.</span><span class="n">PyJsObject</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="n">key</span><span class="p">:</span> <span class="n">_js_object_to_py_object</span><span class="p">(</span><span class="n">value</span><span class="p">[</span><span class="s1">'value'</span><span class="p">])</span> |
| <span class="k">for</span> <span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span> <span class="ow">in</span> <span class="n">obj</span><span class="o">.</span><span class="n">own</span><span class="o">.</span><span class="n">items</span><span class="p">()</span> |
| <span class="p">}</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">base</span><span class="o">.</span><span class="n">JsObjectWrapper</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">_js_object_to_py_object</span><span class="p">(</span><span class="n">obj</span><span class="o">.</span><span class="n">_obj</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">obj</span> |
| |
| <span class="k">if</span> <span class="n">expression</span><span class="p">:</span> |
| <span class="n">source</span> <span class="o">=</span> <span class="s1">'</span><span class="se">\n</span><span class="s1">'</span><span class="o">.</span><span class="n">join</span><span class="p">([</span><span class="s1">'function(__row__) {'</span><span class="p">]</span> <span class="o">+</span> <span class="p">[</span> |
| <span class="sa">f</span><span class="s1">' </span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1"> = __row__.</span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1">'</span> |
| <span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">original_fields</span> <span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">expression</span> |
| <span class="p">]</span> <span class="o">+</span> <span class="p">[</span><span class="s1">' return ('</span> <span class="o">+</span> <span class="n">expression</span> <span class="o">+</span> <span class="s1">')'</span><span class="p">]</span> <span class="o">+</span> <span class="p">[</span><span class="s1">'}'</span><span class="p">])</span> |
| <span class="n">js_func</span> <span class="o">=</span> <span class="n">_CustomJsObjectWrapper</span><span class="p">(</span><span class="n">js2py</span><span class="o">.</span><span class="n">eval_js</span><span class="p">(</span><span class="n">source</span><span class="p">))</span> |
| |
| <span class="k">elif</span> <span class="nb">callable</span><span class="p">:</span> |
| <span class="n">js_func</span> <span class="o">=</span> <span class="n">_CustomJsObjectWrapper</span><span class="p">(</span><span class="n">js2py</span><span class="o">.</span><span class="n">eval_js</span><span class="p">(</span><span class="nb">callable</span><span class="p">))</span> |
| |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">path</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="s1">'.js'</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s1">'File "</span><span class="si">{</span><span class="n">path</span><span class="si">}</span><span class="s1">" is not a valid .js file.'</span><span class="p">)</span> |
| <span class="n">udf_code</span> <span class="o">=</span> <span class="n">FileSystems</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">path</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">()</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span> |
| <span class="n">js</span> <span class="o">=</span> <span class="n">js2py</span><span class="o">.</span><span class="n">EvalJs</span><span class="p">()</span> |
| <span class="n">js</span><span class="o">.</span><span class="n">eval</span><span class="p">(</span><span class="n">udf_code</span><span class="p">)</span> |
| <span class="n">js_func</span> <span class="o">=</span> <span class="n">_CustomJsObjectWrapper</span><span class="p">(</span><span class="nb">getattr</span><span class="p">(</span><span class="n">js</span><span class="p">,</span> <span class="n">name</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">js_wrapper</span><span class="p">(</span><span class="n">row</span><span class="p">):</span> |
| <span class="n">row_as_dict</span> <span class="o">=</span> <span class="n">py_value_to_js_dict</span><span class="p">(</span><span class="n">row</span><span class="p">)</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">js_result</span> <span class="o">=</span> <span class="n">js_func</span><span class="p">(</span><span class="n">row_as_dict</span><span class="p">)</span> |
| <span class="k">except</span> <span class="n">simplex</span><span class="o">.</span><span class="n">JsException</span> <span class="k">as</span> <span class="n">exn</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s2">"Error evaluating javascript expression: "</span> |
| <span class="sa">f</span><span class="s2">"</span><span class="si">{</span><span class="n">exn</span><span class="o">.</span><span class="n">mes</span><span class="p">[</span><span class="s1">'message'</span><span class="p">]</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span> <span class="kn">from</span> <span class="nn">exn</span> |
| <span class="k">return</span> <span class="n">dicts_to_rows</span><span class="p">(</span><span class="n">_js_object_to_py_object</span><span class="p">(</span><span class="n">js_result</span><span class="p">))</span> |
| |
| <span class="k">return</span> <span class="n">js_wrapper</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_expand_python_mapping_func</span><span class="p">(</span> |
| <span class="n">original_fields</span><span class="p">,</span> <span class="n">expression</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="nb">callable</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">path</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">name</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">path</span> <span class="ow">and</span> <span class="n">name</span><span class="p">:</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">path</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="s1">'.py'</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s1">'File "</span><span class="si">{</span><span class="n">path</span><span class="si">}</span><span class="s1">" is not a valid .py file.'</span><span class="p">)</span> |
| <span class="n">py_file</span> <span class="o">=</span> <span class="n">FileSystems</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">path</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">()</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span> |
| |
| <span class="k">return</span> <span class="n">python_callable</span><span class="o">.</span><span class="n">PythonCallableWithSource</span><span class="o">.</span><span class="n">load_from_script</span><span class="p">(</span> |
| <span class="n">py_file</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span> |
| |
| <span class="k">elif</span> <span class="n">expression</span><span class="p">:</span> |
| <span class="c1"># TODO(robertwb): Consider constructing a single callable that takes</span> |
| <span class="c1"># the row and returns the new row, rather than invoking (and unpacking)</span> |
| <span class="c1"># for each field individually.</span> |
| <span class="n">source</span> <span class="o">=</span> <span class="s1">'</span><span class="se">\n</span><span class="s1">'</span><span class="o">.</span><span class="n">join</span><span class="p">([</span><span class="s1">'def fn(__row__):'</span><span class="p">]</span> <span class="o">+</span> <span class="p">[</span> |
| <span class="sa">f</span><span class="s1">' </span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1"> = __row__.</span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1">'</span> |
| <span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">original_fields</span> <span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">expression</span> |
| <span class="p">]</span> <span class="o">+</span> <span class="p">[</span><span class="s1">' return ('</span> <span class="o">+</span> <span class="n">expression</span> <span class="o">+</span> <span class="s1">')'</span><span class="p">])</span> |
| |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">source</span> <span class="o">=</span> <span class="nb">callable</span> |
| |
| <span class="k">return</span> <span class="n">python_callable</span><span class="o">.</span><span class="n">PythonCallableWithSource</span><span class="p">(</span><span class="n">source</span><span class="p">)</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_validator</span><span class="p">(</span><span class="n">beam_type</span><span class="p">:</span> <span class="n">schema_pb2</span><span class="o">.</span><span class="n">FieldType</span><span class="p">)</span> <span class="o">-></span> <span class="n">Callable</span><span class="p">[[</span><span class="n">Any</span><span class="p">],</span> <span class="nb">bool</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""Returns a callable converting rows of the given type to Json objects."""</span> |
| <span class="n">type_info</span> <span class="o">=</span> <span class="n">beam_type</span><span class="o">.</span><span class="n">WhichOneof</span><span class="p">(</span><span class="s2">"type_info"</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">type_info</span> <span class="o">==</span> <span class="s2">"atomic_type"</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">beam_type</span><span class="o">.</span><span class="n">atomic_type</span> <span class="o">==</span> <span class="n">schema_pb2</span><span class="o">.</span><span class="n">BOOLEAN</span><span class="p">:</span> |
| <span class="k">return</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="nb">bool</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">beam_type</span><span class="o">.</span><span class="n">atomic_type</span> <span class="o">==</span> <span class="n">schema_pb2</span><span class="o">.</span><span class="n">INT64</span><span class="p">:</span> |
| <span class="k">return</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="nb">int</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">beam_type</span><span class="o">.</span><span class="n">atomic_type</span> <span class="o">==</span> <span class="n">schema_pb2</span><span class="o">.</span><span class="n">DOUBLE</span><span class="p">:</span> |
| <span class="k">return</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="nb">float</span><span class="p">))</span> |
| <span class="k">elif</span> <span class="n">beam_type</span><span class="o">.</span><span class="n">atomic_type</span> <span class="o">==</span> <span class="n">schema_pb2</span><span class="o">.</span><span class="n">STRING</span><span class="p">:</span> |
| <span class="k">return</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s1">'Unknown or unsupported atomic type: </span><span class="si">{</span><span class="n">beam_type</span><span class="o">.</span><span class="n">atomic_type</span><span class="si">}</span><span class="s1">'</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">type_info</span> <span class="o">==</span> <span class="s2">"array_type"</span><span class="p">:</span> |
| <span class="n">element_validator</span> <span class="o">=</span> <span class="n">_validator</span><span class="p">(</span><span class="n">beam_type</span><span class="o">.</span><span class="n">array_type</span><span class="o">.</span><span class="n">element_type</span><span class="p">)</span> |
| <span class="k">return</span> <span class="k">lambda</span> <span class="n">value</span><span class="p">:</span> <span class="nb">all</span><span class="p">(</span><span class="n">element_validator</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">value</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">type_info</span> <span class="o">==</span> <span class="s2">"iterable_type"</span><span class="p">:</span> |
| <span class="n">element_validator</span> <span class="o">=</span> <span class="n">_validator</span><span class="p">(</span><span class="n">beam_type</span><span class="o">.</span><span class="n">iterable_type</span><span class="o">.</span><span class="n">element_type</span><span class="p">)</span> |
| <span class="k">return</span> <span class="k">lambda</span> <span class="n">value</span><span class="p">:</span> <span class="nb">all</span><span class="p">(</span><span class="n">element_validator</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">value</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">type_info</span> <span class="o">==</span> <span class="s2">"map_type"</span><span class="p">:</span> |
| <span class="n">key_validator</span> <span class="o">=</span> <span class="n">_validator</span><span class="p">(</span><span class="n">beam_type</span><span class="o">.</span><span class="n">map_type</span><span class="o">.</span><span class="n">key_type</span><span class="p">)</span> |
| <span class="n">value_validator</span> <span class="o">=</span> <span class="n">_validator</span><span class="p">(</span><span class="n">beam_type</span><span class="o">.</span><span class="n">map_type</span><span class="o">.</span><span class="n">value_type</span><span class="p">)</span> |
| <span class="k">return</span> <span class="k">lambda</span> <span class="n">value</span><span class="p">:</span> <span class="nb">all</span><span class="p">(</span> |
| <span class="n">key_validator</span><span class="p">(</span><span class="n">k</span><span class="p">)</span> <span class="ow">and</span> <span class="n">value_validator</span><span class="p">(</span><span class="n">v</span><span class="p">)</span> <span class="k">for</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">)</span> <span class="ow">in</span> <span class="n">value</span><span class="o">.</span><span class="n">items</span><span class="p">())</span> |
| <span class="k">elif</span> <span class="n">type_info</span> <span class="o">==</span> <span class="s2">"row_type"</span><span class="p">:</span> |
| <span class="n">validators</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="n">field</span><span class="o">.</span><span class="n">name</span><span class="p">:</span> <span class="n">_validator</span><span class="p">(</span><span class="n">field</span><span class="o">.</span><span class="n">type</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">field</span> <span class="ow">in</span> <span class="n">beam_type</span><span class="o">.</span><span class="n">row_type</span><span class="o">.</span><span class="n">schema</span><span class="o">.</span><span class="n">fields</span> |
| <span class="p">}</span> |
| <span class="k">return</span> <span class="k">lambda</span> <span class="n">row</span><span class="p">:</span> <span class="nb">all</span><span class="p">(</span> |
| <span class="n">validator</span><span class="p">(</span><span class="nb">getattr</span><span class="p">(</span><span class="n">row</span><span class="p">,</span> <span class="n">name</span><span class="p">))</span> |
| <span class="k">for</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">validator</span><span class="p">)</span> <span class="ow">in</span> <span class="n">validators</span><span class="o">.</span><span class="n">items</span><span class="p">())</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Unrecognized type_info: </span><span class="si">{</span><span class="n">type_info</span><span class="si">!r}</span><span class="s2">"</span><span class="p">)</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_as_callable_for_pcoll</span><span class="p">(</span> |
| <span class="n">pcoll</span><span class="p">,</span> |
| <span class="n">fn_spec</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]],</span> |
| <span class="n">msg</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">language</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]):</span> |
| <span class="k">if</span> <span class="n">language</span> <span class="o">==</span> <span class="s1">'javascript'</span><span class="p">:</span> |
| <span class="n">options</span><span class="o">.</span><span class="n">YamlOptions</span><span class="o">.</span><span class="n">check_enabled</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="p">,</span> <span class="s1">'javascript'</span><span class="p">)</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">input_schema</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">named_fields_from_element_type</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">))</span> |
| <span class="k">except</span> <span class="p">(</span><span class="ne">TypeError</span><span class="p">,</span> <span class="ne">ValueError</span><span class="p">)</span> <span class="k">as</span> <span class="n">exn</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">is_expr</span><span class="p">(</span><span class="n">fn_spec</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Can only use expressions on a schema'd input."</span><span class="p">)</span> <span class="kn">from</span> <span class="nn">exn</span> |
| <span class="n">input_schema</span> <span class="o">=</span> <span class="p">{}</span> <span class="c1"># unused</span> |
| |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fn_spec</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span> <span class="ow">and</span> <span class="n">fn_spec</span> <span class="ow">in</span> <span class="n">input_schema</span><span class="p">:</span> |
| <span class="k">return</span> <span class="k">lambda</span> <span class="n">row</span><span class="p">:</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">row</span><span class="p">,</span> <span class="n">fn_spec</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">_as_callable</span><span class="p">(</span><span class="nb">list</span><span class="p">(</span><span class="n">input_schema</span><span class="o">.</span><span class="n">keys</span><span class="p">()),</span> <span class="n">fn_spec</span><span class="p">,</span> <span class="n">msg</span><span class="p">,</span> <span class="n">language</span><span class="p">)</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_as_callable</span><span class="p">(</span><span class="n">original_fields</span><span class="p">,</span> <span class="n">expr</span><span class="p">,</span> <span class="n">transform_name</span><span class="p">,</span> <span class="n">language</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">expr</span> <span class="ow">in</span> <span class="n">original_fields</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">expr</span> |
| |
| <span class="c1"># TODO(yaml): support an imports parameter</span> |
| <span class="c1"># TODO(yaml): support a requirements parameter (possibly at a higher level)</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expr</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="n">expr</span> <span class="o">=</span> <span class="p">{</span><span class="s1">'expression'</span><span class="p">:</span> <span class="n">expr</span><span class="p">}</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expr</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s2">"Ambiguous expression type (perhaps missing quoting?): </span><span class="si">{</span><span class="n">expr</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span> |
| <span class="n">explicit_type</span> <span class="o">=</span> <span class="n">expr</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">'output_type'</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> |
| <span class="n">_check_mapping_arguments</span><span class="p">(</span><span class="n">transform_name</span><span class="p">,</span> <span class="o">**</span><span class="n">expr</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">language</span> <span class="o">==</span> <span class="s2">"javascript"</span><span class="p">:</span> |
| <span class="n">func</span> <span class="o">=</span> <span class="n">_expand_javascript_mapping_func</span><span class="p">(</span><span class="n">original_fields</span><span class="p">,</span> <span class="o">**</span><span class="n">expr</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">language</span> <span class="o">==</span> <span class="s2">"python"</span><span class="p">:</span> |
| <span class="n">func</span> <span class="o">=</span> <span class="n">_expand_python_mapping_func</span><span class="p">(</span><span class="n">original_fields</span><span class="p">,</span> <span class="o">**</span><span class="n">expr</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s1">'Unknown language for mapping transform: </span><span class="si">{</span><span class="n">language</span><span class="si">}</span><span class="s1">. '</span> |
| <span class="s1">'Supported languages are "javascript" and "python."'</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">explicit_type</span><span class="p">:</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">explicit_type</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="n">explicit_type</span> <span class="o">=</span> <span class="p">{</span><span class="s1">'type'</span><span class="p">:</span> <span class="n">explicit_type</span><span class="p">}</span> |
| <span class="n">beam_type</span> <span class="o">=</span> <span class="n">json_utils</span><span class="o">.</span><span class="n">json_type_to_beam_type</span><span class="p">(</span><span class="n">explicit_type</span><span class="p">)</span> |
| <span class="n">validator</span> <span class="o">=</span> <span class="n">_validator</span><span class="p">(</span><span class="n">beam_type</span><span class="p">)</span> |
| |
| <span class="nd">@beam</span><span class="o">.</span><span class="n">typehints</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">schemas</span><span class="o">.</span><span class="n">typing_from_runner_api</span><span class="p">(</span><span class="n">beam_type</span><span class="p">))</span> |
| <span class="k">def</span> <span class="nf">checking_func</span><span class="p">(</span><span class="n">row</span><span class="p">):</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="n">func</span><span class="p">(</span><span class="n">row</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">validator</span><span class="p">(</span><span class="n">result</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="sa">f</span><span class="s1">'</span><span class="si">{</span><span class="n">result</span><span class="si">}</span><span class="s1"> violates schema </span><span class="si">{</span><span class="n">explicit_type</span><span class="si">}</span><span class="s1">'</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">result</span> |
| |
| <span class="k">return</span> <span class="n">checking_func</span> |
| |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">func</span> |
| |
| |
| <div class="viewcode-block" id="ErrorHandlingConfig"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.ErrorHandlingConfig">[docs]</a><span class="k">class</span> <span class="nc">ErrorHandlingConfig</span><span class="p">(</span><span class="n">NamedTuple</span><span class="p">):</span> |
| <span class="n">output</span><span class="p">:</span> <span class="nb">str</span></div> |
| <span class="c1"># TODO: Other parameters are valid here too, but not common to Java.</span> |
| |
| |
| <div class="viewcode-block" id="exception_handling_args"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.exception_handling_args">[docs]</a><span class="k">def</span> <span class="nf">exception_handling_args</span><span class="p">(</span><span class="n">error_handling_spec</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">error_handling_spec</span><span class="p">:</span> |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="s1">'dead_letter_tag'</span> <span class="k">if</span> <span class="n">k</span> <span class="o">==</span> <span class="s1">'output'</span> <span class="k">else</span> <span class="n">k</span><span class="p">:</span> <span class="n">v</span> |
| <span class="k">for</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">)</span> <span class="ow">in</span> <span class="n">error_handling_spec</span><span class="o">.</span><span class="n">items</span><span class="p">()</span> |
| <span class="p">}</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">None</span></div> |
| |
| |
| <span class="k">def</span> <span class="nf">_map_errors_to_standard_format</span><span class="p">(</span><span class="n">input_type</span><span class="p">):</span> |
| <span class="c1"># TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple.</span> |
| |
| <span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span> |
| <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">Row</span><span class="p">(</span><span class="n">element</span><span class="o">=</span><span class="n">x</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">msg</span><span class="o">=</span><span class="nb">str</span><span class="p">(</span><span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">][</span><span class="mi">1</span><span class="p">]),</span> <span class="n">stack</span><span class="o">=</span><span class="nb">str</span><span class="p">(</span><span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">][</span><span class="mi">2</span><span class="p">]))</span> |
| <span class="p">)</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span> |
| <span class="n">RowTypeConstraint</span><span class="o">.</span><span class="n">from_fields</span><span class="p">([(</span><span class="s2">"element"</span><span class="p">,</span> <span class="n">input_type</span><span class="p">),</span> <span class="p">(</span><span class="s2">"msg"</span><span class="p">,</span> <span class="nb">str</span><span class="p">),</span> |
| <span class="p">(</span><span class="s2">"stack"</span><span class="p">,</span> <span class="nb">str</span><span class="p">)]))</span> |
| |
| |
| <div class="viewcode-block" id="maybe_with_exception_handling"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.maybe_with_exception_handling">[docs]</a><span class="k">def</span> <span class="nf">maybe_with_exception_handling</span><span class="p">(</span><span class="n">inner_expand</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="n">wrapped_pcoll</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">core</span><span class="o">.</span><span class="n">_MaybePValueWithErrors</span><span class="p">(</span> |
| <span class="n">pcoll</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_exception_handling_args</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">inner_expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">wrapped_pcoll</span><span class="p">)</span><span class="o">.</span><span class="n">as_result</span><span class="p">(</span> |
| <span class="n">_map_errors_to_standard_format</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">))</span> |
| |
| <span class="k">return</span> <span class="n">expand</span></div> |
| |
| |
| <div class="viewcode-block" id="maybe_with_exception_handling_transform_fn"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.maybe_with_exception_handling_transform_fn">[docs]</a><span class="k">def</span> <span class="nf">maybe_with_exception_handling_transform_fn</span><span class="p">(</span><span class="n">transform_fn</span><span class="p">):</span> |
| <span class="nd">@functools</span><span class="o">.</span><span class="n">wraps</span><span class="p">(</span><span class="n">transform_fn</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">error_handling</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="n">wrapped_pcoll</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">core</span><span class="o">.</span><span class="n">_MaybePValueWithErrors</span><span class="p">(</span> |
| <span class="n">pcoll</span><span class="p">,</span> <span class="n">exception_handling_args</span><span class="p">(</span><span class="n">error_handling</span><span class="p">))</span> |
| <span class="k">return</span> <span class="n">transform_fn</span><span class="p">(</span><span class="n">wrapped_pcoll</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span><span class="o">.</span><span class="n">as_result</span><span class="p">(</span> |
| <span class="n">_map_errors_to_standard_format</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">))</span> |
| |
| <span class="n">original_signature</span> <span class="o">=</span> <span class="n">inspect</span><span class="o">.</span><span class="n">signature</span><span class="p">(</span><span class="n">transform_fn</span><span class="p">)</span> |
| <span class="n">new_parameters</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">original_signature</span><span class="o">.</span><span class="n">parameters</span><span class="o">.</span><span class="n">values</span><span class="p">())</span> |
| <span class="n">error_handling_param</span> <span class="o">=</span> <span class="n">inspect</span><span class="o">.</span><span class="n">Parameter</span><span class="p">(</span> |
| <span class="s1">'error_handling'</span><span class="p">,</span> |
| <span class="n">inspect</span><span class="o">.</span><span class="n">Parameter</span><span class="o">.</span><span class="n">KEYWORD_ONLY</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">annotation</span><span class="o">=</span><span class="n">ErrorHandlingConfig</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">new_parameters</span><span class="p">[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">kind</span> <span class="o">==</span> <span class="n">inspect</span><span class="o">.</span><span class="n">Parameter</span><span class="o">.</span><span class="n">VAR_KEYWORD</span><span class="p">:</span> |
| <span class="n">new_parameters</span><span class="o">.</span><span class="n">insert</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">,</span> <span class="n">error_handling_param</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">new_parameters</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">error_handling_param</span><span class="p">)</span> |
| <span class="n">expand</span><span class="o">.</span><span class="n">__signature__</span> <span class="o">=</span> <span class="n">original_signature</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="n">parameters</span><span class="o">=</span><span class="n">new_parameters</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">expand</span></div> |
| |
| |
| <span class="k">class</span> <span class="nc">_Explode</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Explodes (aka unnest/flatten) one or more fields producing multiple rows.</span> |
| |
| <span class="sd"> Given one or more fields of iterable type, produces multiple rows, one for</span> |
| <span class="sd"> each value of that field. For example, a row of the form `('a', [1, 2, 3])`</span> |
| <span class="sd"> would expand to `('a', 1)`, `('a', 2')`, and `('a', 3)` when exploded on</span> |
| <span class="sd"> the second field.</span> |
| |
| <span class="sd"> This is akin to a `FlatMap` when paired with the MapToFields transform.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> fields: The list of fields to expand.</span> |
| <span class="sd"> cross_product: If multiple fields are specified, indicates whether the</span> |
| <span class="sd"> full cross-product of combinations should be produced, or if the</span> |
| <span class="sd"> first element of the first field corresponds to the first element</span> |
| <span class="sd"> of the second field, etc. For example, the row</span> |
| <span class="sd"> `(['a', 'b'], [1, 2])` would expand to the four rows</span> |
| <span class="sd"> `('a', 1)`, `('a', 2)`, `('b', 1)`, and `('b', 2)` when</span> |
| <span class="sd"> `cross_product` is set to `true` but only the two rows</span> |
| <span class="sd"> `('a', 1)` and `('b', 2)` when it is set to `false`.</span> |
| <span class="sd"> Only meaningful (and required) if multiple rows are specified.</span> |
| <span class="sd"> error_handling: Whether and how to handle errors during iteration.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">fields</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Collection</span><span class="p">[</span><span class="nb">str</span><span class="p">]],</span> |
| <span class="n">cross_product</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">error_handling</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Mapping</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fields</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="n">fields</span> <span class="o">=</span> <span class="p">[</span><span class="n">fields</span><span class="p">]</span> |
| <span class="k">if</span> <span class="n">cross_product</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">fields</span><span class="p">)</span> <span class="o">></span> <span class="mi">1</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'cross_product must be specified true or false '</span> |
| <span class="s1">'when exploding multiple fields'</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># Doesn't matter.</span> |
| <span class="n">cross_product</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_fields</span> <span class="o">=</span> <span class="n">fields</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_cross_product</span> <span class="o">=</span> <span class="n">cross_product</span> |
| <span class="c1"># TODO(yaml): Support standard error handling argument.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_exception_handling_args</span> <span class="o">=</span> <span class="n">exception_handling_args</span><span class="p">(</span><span class="n">error_handling</span><span class="p">)</span> |
| |
| <span class="nd">@maybe_with_exception_handling</span> |
| <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="n">all_fields</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="n">x</span> <span class="k">for</span> <span class="n">x</span><span class="p">,</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">named_fields_from_element_type</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">)</span> |
| <span class="p">]</span> |
| <span class="k">for</span> <span class="n">field</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fields</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">field</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">all_fields</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s1">'Exploding unknown field "</span><span class="si">{</span><span class="n">field</span><span class="si">}</span><span class="s1">"'</span><span class="p">)</span> |
| <span class="n">to_explode</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fields</span> |
| |
| <span class="k">def</span> <span class="nf">explode_cross_product</span><span class="p">(</span><span class="n">base</span><span class="p">,</span> <span class="n">fields</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">fields</span><span class="p">:</span> |
| <span class="n">copy</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">base</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">base</span><span class="p">[</span><span class="n">fields</span><span class="p">[</span><span class="mi">0</span><span class="p">]]:</span> |
| <span class="n">copy</span><span class="p">[</span><span class="n">fields</span><span class="p">[</span><span class="mi">0</span><span class="p">]]</span> <span class="o">=</span> <span class="n">value</span> |
| <span class="k">yield from</span> <span class="n">explode_cross_product</span><span class="p">(</span><span class="n">copy</span><span class="p">,</span> <span class="n">fields</span><span class="p">[</span><span class="mi">1</span><span class="p">:])</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">beam</span><span class="o">.</span><span class="n">Row</span><span class="p">(</span><span class="o">**</span><span class="n">base</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">explode_zip</span><span class="p">(</span><span class="n">base</span><span class="p">,</span> <span class="n">fields</span><span class="p">):</span> |
| <span class="n">to_zip</span> <span class="o">=</span> <span class="p">[</span><span class="n">base</span><span class="p">[</span><span class="n">field</span><span class="p">]</span> <span class="k">for</span> <span class="n">field</span> <span class="ow">in</span> <span class="n">fields</span><span class="p">]</span> |
| <span class="n">copy</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">base</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">values</span> <span class="ow">in</span> <span class="n">itertools</span><span class="o">.</span><span class="n">zip_longest</span><span class="p">(</span><span class="o">*</span><span class="n">to_zip</span><span class="p">,</span> <span class="n">fillvalue</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">ix</span><span class="p">,</span> <span class="n">field</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="n">fields</span><span class="p">):</span> |
| <span class="n">copy</span><span class="p">[</span><span class="n">field</span><span class="p">]</span> <span class="o">=</span> <span class="n">values</span><span class="p">[</span><span class="n">ix</span><span class="p">]</span> |
| <span class="k">yield</span> <span class="n">beam</span><span class="o">.</span><span class="n">Row</span><span class="p">(</span><span class="o">**</span><span class="n">copy</span><span class="p">)</span> |
| |
| <span class="n">cross_product</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cross_product</span> |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="n">pcoll</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span> |
| <span class="k">lambda</span> <span class="n">row</span><span class="p">:</span> |
| <span class="p">(</span><span class="n">explode_cross_product</span> <span class="k">if</span> <span class="n">cross_product</span> <span class="k">else</span> <span class="n">explode_zip</span><span class="p">)</span> |
| <span class="p">({</span><span class="n">name</span><span class="p">:</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">row</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">all_fields</span><span class="p">},</span> <span class="n">to_explode</span><span class="p">)))</span> |
| |
| <span class="k">def</span> <span class="nf">infer_output_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">row_type</span><span class="o">.</span><span class="n">RowTypeConstraint</span><span class="o">.</span><span class="n">from_fields</span><span class="p">([(</span> |
| <span class="n">name</span><span class="p">,</span> |
| <span class="n">trivial_inference</span><span class="o">.</span><span class="n">element_type</span><span class="p">(</span><span class="n">typ</span><span class="p">)</span> <span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fields</span> <span class="k">else</span> |
| <span class="n">typ</span><span class="p">)</span> <span class="k">for</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">typ</span><span class="p">)</span> <span class="ow">in</span> <span class="n">named_fields_from_element_type</span><span class="p">(</span><span class="n">input_type</span><span class="p">)])</span> |
| |
| <span class="k">def</span> <span class="nf">with_exception_handling</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="c1"># It's possible there's an error in iteration...</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_exception_handling_args</span> <span class="o">=</span> <span class="n">kwargs</span> |
| <span class="k">return</span> <span class="bp">self</span> |
| |
| |
| <span class="nd">@beam</span><span class="o">.</span><span class="n">ptransform</span><span class="o">.</span><span class="n">ptransform_fn</span> |
| <span class="nd">@maybe_with_exception_handling_transform_fn</span> |
| <span class="k">def</span> <span class="nf">_PyJsFilter</span><span class="p">(</span> |
| <span class="n">pcoll</span><span class="p">,</span> <span class="n">keep</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]],</span> <span class="n">language</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">):</span> |
| <span class="n">keep_fn</span> <span class="o">=</span> <span class="n">_as_callable_for_pcoll</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">keep</span><span class="p">,</span> <span class="s2">"keep"</span><span class="p">,</span> <span class="n">language</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Filter</span><span class="p">(</span><span class="n">keep_fn</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="is_expr"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.is_expr">[docs]</a><span class="k">def</span> <span class="nf">is_expr</span><span class="p">(</span><span class="n">v</span><span class="p">):</span> |
| <span class="k">return</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span> <span class="ow">or</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">dict</span><span class="p">)</span> <span class="ow">and</span> <span class="s1">'expression'</span> <span class="ow">in</span> <span class="n">v</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="normalize_fields"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.normalize_fields">[docs]</a><span class="k">def</span> <span class="nf">normalize_fields</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">fields</span><span class="p">,</span> <span class="n">drop</span><span class="o">=</span><span class="p">(),</span> <span class="n">append</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">language</span><span class="o">=</span><span class="s1">'generic'</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">input_schema</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">named_fields_from_element_type</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">))</span> |
| <span class="k">except</span> <span class="p">(</span><span class="ne">TypeError</span><span class="p">,</span> <span class="ne">ValueError</span><span class="p">)</span> <span class="k">as</span> <span class="n">exn</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">drop</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Can only drop fields on a schema'd input."</span><span class="p">)</span> <span class="kn">from</span> <span class="nn">exn</span> |
| <span class="k">if</span> <span class="n">append</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Can only append fields on a schema'd input."</span><span class="p">)</span> <span class="kn">from</span> <span class="nn">exn</span> |
| <span class="k">elif</span> <span class="nb">any</span><span class="p">(</span><span class="n">is_expr</span><span class="p">(</span><span class="n">x</span><span class="p">)</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">fields</span><span class="o">.</span><span class="n">values</span><span class="p">()):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Can only use expressions on a schema'd input."</span><span class="p">)</span> <span class="kn">from</span> <span class="nn">exn</span> |
| <span class="n">input_schema</span> <span class="o">=</span> <span class="p">{}</span> |
| |
| <span class="k">if</span> <span class="n">drop</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">append</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Can only drop fields if append is true."</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">drop</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">input_schema</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s1">'Dropping unknown field "</span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1">"'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">append</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">fields</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">input_schema</span> <span class="ow">and</span> <span class="n">name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">drop</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s1">'Redefinition of field "</span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1">". '</span> |
| <span class="s1">'Cannot append a field that already exists in original input.'</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">language</span> <span class="o">==</span> <span class="s1">'generic'</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">expr</span> <span class="ow">in</span> <span class="n">fields</span><span class="o">.</span><span class="n">values</span><span class="p">():</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expr</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"Missing language specification. "</span> |
| <span class="s2">"Must specify a language when using a map with custom logic."</span><span class="p">)</span> |
| <span class="n">missing</span> <span class="o">=</span> <span class="nb">set</span><span class="p">(</span><span class="n">fields</span><span class="o">.</span><span class="n">values</span><span class="p">())</span> <span class="o">-</span> <span class="nb">set</span><span class="p">(</span><span class="n">input_schema</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span> |
| <span class="k">if</span> <span class="n">missing</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s2">"Missing language specification or unknown input fields: </span><span class="si">{</span><span class="n">missing</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">append</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">input_schema</span><span class="p">,</span> <span class="p">{</span> |
| <span class="o">**</span><span class="p">{</span><span class="n">name</span><span class="p">:</span> <span class="n">name</span> |
| <span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">input_schema</span><span class="o">.</span><span class="n">keys</span><span class="p">()</span> <span class="k">if</span> <span class="n">name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">drop</span><span class="p">},</span> |
| <span class="o">**</span><span class="n">fields</span> |
| <span class="p">}</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">input_schema</span><span class="p">,</span> <span class="n">fields</span></div> |
| |
| |
| <span class="nd">@beam</span><span class="o">.</span><span class="n">ptransform</span><span class="o">.</span><span class="n">ptransform_fn</span> |
| <span class="nd">@maybe_with_exception_handling_transform_fn</span> |
| <span class="k">def</span> <span class="nf">_PyJsMapToFields</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">language</span><span class="o">=</span><span class="s1">'generic'</span><span class="p">,</span> <span class="o">**</span><span class="n">mapping_args</span><span class="p">):</span> |
| <span class="n">input_schema</span><span class="p">,</span> <span class="n">fields</span> <span class="o">=</span> <span class="n">normalize_fields</span><span class="p">(</span> |
| <span class="n">pcoll</span><span class="p">,</span> <span class="n">language</span><span class="o">=</span><span class="n">language</span><span class="p">,</span> <span class="o">**</span><span class="n">mapping_args</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">language</span> <span class="o">==</span> <span class="s1">'javascript'</span><span class="p">:</span> |
| <span class="n">options</span><span class="o">.</span><span class="n">YamlOptions</span><span class="o">.</span><span class="n">check_enabled</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="p">,</span> <span class="s1">'javascript'</span><span class="p">)</span> |
| |
| <span class="n">original_fields</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">input_schema</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span> |
| |
| <span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Select</span><span class="p">(</span> |
| <span class="o">**</span><span class="p">{</span> |
| <span class="n">name</span><span class="p">:</span> <span class="n">_as_callable</span><span class="p">(</span><span class="n">original_fields</span><span class="p">,</span> <span class="n">expr</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">language</span><span class="p">)</span> |
| <span class="k">for</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">expr</span><span class="p">)</span> <span class="ow">in</span> <span class="n">fields</span><span class="o">.</span><span class="n">items</span><span class="p">()</span> |
| <span class="p">})</span> |
| |
| |
| <span class="nd">@beam</span><span class="o">.</span><span class="n">ptransform</span><span class="o">.</span><span class="n">ptransform_fn</span> |
| <span class="k">def</span> <span class="nf">_SqlFilterTransform</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">sql_transform_constructor</span><span class="p">,</span> <span class="n">keep</span><span class="p">,</span> <span class="n">language</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">sql_transform_constructor</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s1">'SELECT * FROM PCOLLECTION WHERE </span><span class="si">{</span><span class="n">keep</span><span class="si">}</span><span class="s1">'</span><span class="p">)</span> |
| |
| |
| <span class="nd">@beam</span><span class="o">.</span><span class="n">ptransform</span><span class="o">.</span><span class="n">ptransform_fn</span> |
| <span class="k">def</span> <span class="nf">_SqlMapToFieldsTransform</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">sql_transform_constructor</span><span class="p">,</span> <span class="o">**</span><span class="n">mapping_args</span><span class="p">):</span> |
| <span class="n">_</span><span class="p">,</span> <span class="n">fields</span> <span class="o">=</span> <span class="n">normalize_fields</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="o">**</span><span class="n">mapping_args</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">extract_expr</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">v</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">v</span> |
| <span class="k">elif</span> <span class="s1">'expression'</span> <span class="ow">in</span> <span class="n">v</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">v</span><span class="p">[</span><span class="s1">'expression'</span><span class="p">]</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Only expressions allowed in SQL at </span><span class="si">{name}</span><span class="s2">."</span><span class="p">)</span> |
| |
| <span class="n">selects</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="sa">f</span><span class="s1">'(</span><span class="si">{</span><span class="n">extract_expr</span><span class="p">(</span><span class="n">name</span><span class="p">,</span><span class="w"> </span><span class="n">expr</span><span class="p">)</span><span class="si">}</span><span class="s1">) AS </span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1">'</span> |
| <span class="k">for</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">expr</span><span class="p">)</span> <span class="ow">in</span> <span class="n">fields</span><span class="o">.</span><span class="n">items</span><span class="p">()</span> |
| <span class="p">]</span> |
| <span class="n">query</span> <span class="o">=</span> <span class="s2">"SELECT "</span> <span class="o">+</span> <span class="s2">", "</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">selects</span><span class="p">)</span> <span class="o">+</span> <span class="s2">" FROM PCOLLECTION"</span> |
| <span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">sql_transform_constructor</span><span class="p">(</span><span class="n">query</span><span class="p">)</span> |
| |
| |
| <span class="nd">@beam</span><span class="o">.</span><span class="n">ptransform</span><span class="o">.</span><span class="n">ptransform_fn</span> |
| <span class="k">def</span> <span class="nf">_AssignTimestamps</span><span class="p">(</span> |
| <span class="n">pcoll</span><span class="p">,</span> |
| <span class="n">timestamp</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]],</span> |
| <span class="n">language</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">):</span> |
| <span class="n">timestamp_fn</span> <span class="o">=</span> <span class="n">_as_callable_for_pcoll</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">,</span> <span class="s1">'timestamp'</span><span class="p">,</span> <span class="n">language</span><span class="p">)</span> |
| <span class="n">T</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s1">'T'</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">TimestampedValue</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">timestamp_fn</span><span class="p">(</span><span class="n">x</span><span class="p">))</span> |
| <span class="p">)</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">T</span><span class="p">)</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">T</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="create_mapping_providers"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.create_mapping_providers">[docs]</a><span class="k">def</span> <span class="nf">create_mapping_providers</span><span class="p">():</span> |
| <span class="c1"># These are MetaInlineProviders because their expansion is in terms of other</span> |
| <span class="c1"># YamlTransforms, but in a way that needs to be deferred until the input</span> |
| <span class="c1"># schema is known.</span> |
| <span class="k">return</span> <span class="p">[</span> |
| <span class="n">yaml_provider</span><span class="o">.</span><span class="n">InlineProvider</span><span class="p">({</span> |
| <span class="s1">'AssignTimestamps-python'</span><span class="p">:</span> <span class="n">_AssignTimestamps</span><span class="p">,</span> |
| <span class="s1">'AssignTimestamps-javascript'</span><span class="p">:</span> <span class="n">_AssignTimestamps</span><span class="p">,</span> |
| <span class="s1">'AssignTimestamps-generic'</span><span class="p">:</span> <span class="n">_AssignTimestamps</span><span class="p">,</span> |
| <span class="s1">'Explode'</span><span class="p">:</span> <span class="n">_Explode</span><span class="p">,</span> |
| <span class="s1">'Filter-python'</span><span class="p">:</span> <span class="n">_PyJsFilter</span><span class="p">,</span> |
| <span class="s1">'Filter-javascript'</span><span class="p">:</span> <span class="n">_PyJsFilter</span><span class="p">,</span> |
| <span class="s1">'MapToFields-python'</span><span class="p">:</span> <span class="n">_PyJsMapToFields</span><span class="p">,</span> |
| <span class="s1">'MapToFields-javascript'</span><span class="p">:</span> <span class="n">_PyJsMapToFields</span><span class="p">,</span> |
| <span class="s1">'MapToFields-generic'</span><span class="p">:</span> <span class="n">_PyJsMapToFields</span><span class="p">,</span> |
| <span class="p">}),</span> |
| <span class="n">yaml_provider</span><span class="o">.</span><span class="n">SqlBackedProvider</span><span class="p">({</span> |
| <span class="s1">'Filter-sql'</span><span class="p">:</span> <span class="n">_SqlFilterTransform</span><span class="p">,</span> |
| <span class="s1">'Filter-calcite'</span><span class="p">:</span> <span class="n">_SqlFilterTransform</span><span class="p">,</span> |
| <span class="s1">'MapToFields-sql'</span><span class="p">:</span> <span class="n">_SqlMapToFieldsTransform</span><span class="p">,</span> |
| <span class="s1">'MapToFields-calcite'</span><span class="p">:</span> <span class="n">_SqlMapToFieldsTransform</span><span class="p">,</span> |
| <span class="p">}),</span> |
| <span class="p">]</span></div> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| © Copyright |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |