blob: 67663b7ab6ac176c03dcaf4c39cb951fffa3b7c0 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.yaml.yaml_mapping &mdash; Apache Beam 2.55.1 documentation</title>
<script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../genindex.html" />
<link rel="search" title="Search" href="../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.55.1
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.yaml.yaml_mapping</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.yaml.yaml_mapping</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;This module defines the basic MapToFields operation.&quot;&quot;&quot;</span>
<span class="kn">import</span> <span class="nn">functools</span>
<span class="kn">import</span> <span class="nn">inspect</span>
<span class="kn">import</span> <span class="nn">itertools</span>
<span class="kn">from</span> <span class="nn">collections</span> <span class="kn">import</span> <span class="n">abc</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Any</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Callable</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Collection</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Dict</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Mapping</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">NamedTuple</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Optional</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">TypeVar</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Union</span>
<span class="kn">import</span> <span class="nn">js2py</span>
<span class="kn">from</span> <span class="nn">js2py</span> <span class="kn">import</span> <span class="n">base</span>
<span class="kn">from</span> <span class="nn">js2py.constructors</span> <span class="kn">import</span> <span class="n">jsdate</span>
<span class="kn">from</span> <span class="nn">js2py.internals</span> <span class="kn">import</span> <span class="n">simplex</span>
<span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.filesystems</span> <span class="kn">import</span> <span class="n">FileSystems</span>
<span class="kn">from</span> <span class="nn">apache_beam.portability.api</span> <span class="kn">import</span> <span class="n">schema_pb2</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">TimestampedValue</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="kn">import</span> <span class="n">row_type</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="kn">import</span> <span class="n">schemas</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints</span> <span class="kn">import</span> <span class="n">trivial_inference</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints.row_type</span> <span class="kn">import</span> <span class="n">RowTypeConstraint</span>
<span class="kn">from</span> <span class="nn">apache_beam.typehints.schemas</span> <span class="kn">import</span> <span class="n">named_fields_from_element_type</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils</span> <span class="kn">import</span> <span class="n">python_callable</span>
<span class="kn">from</span> <span class="nn">apache_beam.yaml</span> <span class="kn">import</span> <span class="n">json_utils</span>
<span class="kn">from</span> <span class="nn">apache_beam.yaml</span> <span class="kn">import</span> <span class="n">options</span>
<span class="kn">from</span> <span class="nn">apache_beam.yaml</span> <span class="kn">import</span> <span class="n">yaml_provider</span>
<span class="kn">from</span> <span class="nn">apache_beam.yaml.yaml_provider</span> <span class="kn">import</span> <span class="n">dicts_to_rows</span>
<div class="viewcode-block" id="normalize_mapping"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.normalize_mapping">[docs]</a><span class="k">def</span> <span class="nf">normalize_mapping</span><span class="p">(</span><span class="n">spec</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Normalizes various fields for mapping transforms.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">spec</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;MapToFields&#39;</span><span class="p">:</span>
<span class="n">config</span> <span class="o">=</span> <span class="n">spec</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;config&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">config</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;drop&#39;</span><span class="p">),</span> <span class="nb">str</span><span class="p">):</span>
<span class="n">config</span><span class="p">[</span><span class="s1">&#39;drop&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="p">[</span><span class="n">config</span><span class="p">[</span><span class="s1">&#39;drop&#39;</span><span class="p">]]</span>
<span class="k">return</span> <span class="n">spec</span></div>
<span class="k">def</span> <span class="nf">_check_mapping_arguments</span><span class="p">(</span>
<span class="n">transform_name</span><span class="p">,</span> <span class="n">expression</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="nb">callable</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">path</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="c1"># Argument checking</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">expression</span> <span class="ow">and</span> <span class="ow">not</span> <span class="nb">callable</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">path</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">name</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="sa">f</span><span class="s1">&#39;</span><span class="si">{</span><span class="n">transform_name</span><span class="si">}</span><span class="s1"> must specify either &quot;expression&quot;, &quot;callable&quot;, &#39;</span>
<span class="sa">f</span><span class="s1">&#39;or both &quot;path&quot; and &quot;name&quot;&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">expression</span> <span class="ow">and</span> <span class="nb">callable</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="sa">f</span><span class="s1">&#39;</span><span class="si">{</span><span class="n">transform_name</span><span class="si">}</span><span class="s1"> cannot specify both &quot;expression&quot; and &quot;callable&quot;&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="p">(</span><span class="n">expression</span> <span class="ow">or</span> <span class="nb">callable</span><span class="p">)</span> <span class="ow">and</span> <span class="p">(</span><span class="n">path</span> <span class="ow">or</span> <span class="n">name</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="sa">f</span><span class="s1">&#39;</span><span class="si">{</span><span class="n">transform_name</span><span class="si">}</span><span class="s1"> cannot specify &quot;expression&quot; or &quot;callable&quot; with &#39;</span>
<span class="sa">f</span><span class="s1">&#39;&quot;path&quot; or &quot;name&quot;&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">path</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">name</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s1">&#39;</span><span class="si">{</span><span class="n">transform_name</span><span class="si">}</span><span class="s1"> cannot specify &quot;path&quot; without &quot;name&quot;&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">name</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">path</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s1">&#39;</span><span class="si">{</span><span class="n">transform_name</span><span class="si">}</span><span class="s1"> cannot specify &quot;name&quot; without &quot;path&quot;&#39;</span><span class="p">)</span>
<span class="c1"># js2py&#39;s JsObjectWrapper object has a self-referencing __dict__ property</span>
<span class="c1"># that cannot be pickled without implementing the __getstate__ and</span>
<span class="c1"># __setstate__ methods.</span>
<span class="k">class</span> <span class="nc">_CustomJsObjectWrapper</span><span class="p">(</span><span class="n">js2py</span><span class="o">.</span><span class="n">base</span><span class="o">.</span><span class="n">JsObjectWrapper</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">js_obj</span><span class="p">):</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">js_obj</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">[</span><span class="s1">&#39;_obj&#39;</span><span class="p">])</span>
<span class="k">def</span> <span class="nf">__getstate__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">__setstate__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">state</span><span class="p">)</span>
<span class="c1"># TODO(yaml) Improve type inferencing for JS UDF&#39;s</span>
<div class="viewcode-block" id="py_value_to_js_dict"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.py_value_to_js_dict">[docs]</a><span class="k">def</span> <span class="nf">py_value_to_js_dict</span><span class="p">(</span><span class="n">py_value</span><span class="p">):</span>
<span class="k">if</span> <span class="p">((</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">py_value</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">py_value</span><span class="p">,</span> <span class="s1">&#39;_asdict&#39;</span><span class="p">))</span> <span class="ow">or</span>
<span class="nb">isinstance</span><span class="p">(</span><span class="n">py_value</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">Row</span><span class="p">)):</span>
<span class="n">py_value</span> <span class="o">=</span> <span class="n">py_value</span><span class="o">.</span><span class="n">_asdict</span><span class="p">()</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">py_value</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="n">key</span><span class="p">:</span> <span class="n">py_value_to_js_dict</span><span class="p">(</span><span class="n">value</span><span class="p">)</span> <span class="k">for</span> <span class="n">key</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">py_value</span><span class="o">.</span><span class="n">items</span><span class="p">()}</span>
<span class="k">elif</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">py_value</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">py_value</span><span class="p">,</span> <span class="n">abc</span><span class="o">.</span><span class="n">Iterable</span><span class="p">):</span>
<span class="k">return</span> <span class="p">[</span><span class="n">py_value_to_js_dict</span><span class="p">(</span><span class="n">value</span><span class="p">)</span> <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">py_value</span><span class="p">)]</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">py_value</span></div>
<span class="c1"># TODO(yaml) Consider adding optional language version parameter to support</span>
<span class="c1"># ECMAScript 5 and 6</span>
<span class="k">def</span> <span class="nf">_expand_javascript_mapping_func</span><span class="p">(</span>
<span class="n">original_fields</span><span class="p">,</span> <span class="n">expression</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="nb">callable</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">path</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">name</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="n">js_array_type</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">base</span><span class="o">.</span><span class="n">PyJsArray</span><span class="p">,</span>
<span class="n">base</span><span class="o">.</span><span class="n">PyJsArrayBuffer</span><span class="p">,</span>
<span class="n">base</span><span class="o">.</span><span class="n">PyJsInt8Array</span><span class="p">,</span>
<span class="n">base</span><span class="o">.</span><span class="n">PyJsUint8Array</span><span class="p">,</span>
<span class="n">base</span><span class="o">.</span><span class="n">PyJsUint8ClampedArray</span><span class="p">,</span>
<span class="n">base</span><span class="o">.</span><span class="n">PyJsInt16Array</span><span class="p">,</span>
<span class="n">base</span><span class="o">.</span><span class="n">PyJsUint16Array</span><span class="p">,</span>
<span class="n">base</span><span class="o">.</span><span class="n">PyJsInt32Array</span><span class="p">,</span>
<span class="n">base</span><span class="o">.</span><span class="n">PyJsUint32Array</span><span class="p">,</span>
<span class="n">base</span><span class="o">.</span><span class="n">PyJsFloat32Array</span><span class="p">,</span>
<span class="n">base</span><span class="o">.</span><span class="n">PyJsFloat64Array</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_js_object_to_py_object</span><span class="p">(</span><span class="n">obj</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="p">(</span><span class="n">base</span><span class="o">.</span><span class="n">PyJsNumber</span><span class="p">,</span> <span class="n">base</span><span class="o">.</span><span class="n">PyJsString</span><span class="p">,</span> <span class="n">base</span><span class="o">.</span><span class="n">PyJsBoolean</span><span class="p">)):</span>
<span class="k">return</span> <span class="n">base</span><span class="o">.</span><span class="n">to_python</span><span class="p">(</span><span class="n">obj</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">js_array_type</span><span class="p">):</span>
<span class="k">return</span> <span class="p">[</span><span class="n">_js_object_to_py_object</span><span class="p">(</span><span class="n">value</span><span class="p">)</span> <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">obj</span><span class="o">.</span><span class="n">to_list</span><span class="p">()]</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">jsdate</span><span class="o">.</span><span class="n">PyJsDate</span><span class="p">):</span>
<span class="k">return</span> <span class="n">obj</span><span class="o">.</span><span class="n">to_utc_dt</span><span class="p">()</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="p">(</span><span class="n">base</span><span class="o">.</span><span class="n">PyJsNull</span><span class="p">,</span> <span class="n">base</span><span class="o">.</span><span class="n">PyJsUndefined</span><span class="p">)):</span>
<span class="k">return</span> <span class="kc">None</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">base</span><span class="o">.</span><span class="n">PyJsError</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span><span class="n">obj</span><span class="p">[</span><span class="s1">&#39;message&#39;</span><span class="p">])</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">base</span><span class="o">.</span><span class="n">PyJsObject</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span>
<span class="n">key</span><span class="p">:</span> <span class="n">_js_object_to_py_object</span><span class="p">(</span><span class="n">value</span><span class="p">[</span><span class="s1">&#39;value&#39;</span><span class="p">])</span>
<span class="k">for</span> <span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span> <span class="ow">in</span> <span class="n">obj</span><span class="o">.</span><span class="n">own</span><span class="o">.</span><span class="n">items</span><span class="p">()</span>
<span class="p">}</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="n">base</span><span class="o">.</span><span class="n">JsObjectWrapper</span><span class="p">):</span>
<span class="k">return</span> <span class="n">_js_object_to_py_object</span><span class="p">(</span><span class="n">obj</span><span class="o">.</span><span class="n">_obj</span><span class="p">)</span>
<span class="k">return</span> <span class="n">obj</span>
<span class="k">if</span> <span class="n">expression</span><span class="p">:</span>
<span class="n">source</span> <span class="o">=</span> <span class="s1">&#39;</span><span class="se">\n</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">join</span><span class="p">([</span><span class="s1">&#39;function(__row__) {&#39;</span><span class="p">]</span> <span class="o">+</span> <span class="p">[</span>
<span class="sa">f</span><span class="s1">&#39; </span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1"> = __row__.</span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1">&#39;</span>
<span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">original_fields</span> <span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">expression</span>
<span class="p">]</span> <span class="o">+</span> <span class="p">[</span><span class="s1">&#39; return (&#39;</span> <span class="o">+</span> <span class="n">expression</span> <span class="o">+</span> <span class="s1">&#39;)&#39;</span><span class="p">]</span> <span class="o">+</span> <span class="p">[</span><span class="s1">&#39;}&#39;</span><span class="p">])</span>
<span class="n">js_func</span> <span class="o">=</span> <span class="n">_CustomJsObjectWrapper</span><span class="p">(</span><span class="n">js2py</span><span class="o">.</span><span class="n">eval_js</span><span class="p">(</span><span class="n">source</span><span class="p">))</span>
<span class="k">elif</span> <span class="nb">callable</span><span class="p">:</span>
<span class="n">js_func</span> <span class="o">=</span> <span class="n">_CustomJsObjectWrapper</span><span class="p">(</span><span class="n">js2py</span><span class="o">.</span><span class="n">eval_js</span><span class="p">(</span><span class="nb">callable</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">path</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="s1">&#39;.js&#39;</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s1">&#39;File &quot;</span><span class="si">{</span><span class="n">path</span><span class="si">}</span><span class="s1">&quot; is not a valid .js file.&#39;</span><span class="p">)</span>
<span class="n">udf_code</span> <span class="o">=</span> <span class="n">FileSystems</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">path</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">()</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
<span class="n">js</span> <span class="o">=</span> <span class="n">js2py</span><span class="o">.</span><span class="n">EvalJs</span><span class="p">()</span>
<span class="n">js</span><span class="o">.</span><span class="n">eval</span><span class="p">(</span><span class="n">udf_code</span><span class="p">)</span>
<span class="n">js_func</span> <span class="o">=</span> <span class="n">_CustomJsObjectWrapper</span><span class="p">(</span><span class="nb">getattr</span><span class="p">(</span><span class="n">js</span><span class="p">,</span> <span class="n">name</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">js_wrapper</span><span class="p">(</span><span class="n">row</span><span class="p">):</span>
<span class="n">row_as_dict</span> <span class="o">=</span> <span class="n">py_value_to_js_dict</span><span class="p">(</span><span class="n">row</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">js_result</span> <span class="o">=</span> <span class="n">js_func</span><span class="p">(</span><span class="n">row_as_dict</span><span class="p">)</span>
<span class="k">except</span> <span class="n">simplex</span><span class="o">.</span><span class="n">JsException</span> <span class="k">as</span> <span class="n">exn</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span>
<span class="sa">f</span><span class="s2">&quot;Error evaluating javascript expression: &quot;</span>
<span class="sa">f</span><span class="s2">&quot;</span><span class="si">{</span><span class="n">exn</span><span class="o">.</span><span class="n">mes</span><span class="p">[</span><span class="s1">&#39;message&#39;</span><span class="p">]</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span> <span class="kn">from</span> <span class="nn">exn</span>
<span class="k">return</span> <span class="n">dicts_to_rows</span><span class="p">(</span><span class="n">_js_object_to_py_object</span><span class="p">(</span><span class="n">js_result</span><span class="p">))</span>
<span class="k">return</span> <span class="n">js_wrapper</span>
<span class="k">def</span> <span class="nf">_expand_python_mapping_func</span><span class="p">(</span>
<span class="n">original_fields</span><span class="p">,</span> <span class="n">expression</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="nb">callable</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">path</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">name</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="n">path</span> <span class="ow">and</span> <span class="n">name</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">path</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="s1">&#39;.py&#39;</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s1">&#39;File &quot;</span><span class="si">{</span><span class="n">path</span><span class="si">}</span><span class="s1">&quot; is not a valid .py file.&#39;</span><span class="p">)</span>
<span class="n">py_file</span> <span class="o">=</span> <span class="n">FileSystems</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">path</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">()</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
<span class="k">return</span> <span class="n">python_callable</span><span class="o">.</span><span class="n">PythonCallableWithSource</span><span class="o">.</span><span class="n">load_from_script</span><span class="p">(</span>
<span class="n">py_file</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">expression</span><span class="p">:</span>
<span class="c1"># TODO(robertwb): Consider constructing a single callable that takes</span>
<span class="c1"># the row and returns the new row, rather than invoking (and unpacking)</span>
<span class="c1"># for each field individually.</span>
<span class="n">source</span> <span class="o">=</span> <span class="s1">&#39;</span><span class="se">\n</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">join</span><span class="p">([</span><span class="s1">&#39;def fn(__row__):&#39;</span><span class="p">]</span> <span class="o">+</span> <span class="p">[</span>
<span class="sa">f</span><span class="s1">&#39; </span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1"> = __row__.</span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1">&#39;</span>
<span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">original_fields</span> <span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">expression</span>
<span class="p">]</span> <span class="o">+</span> <span class="p">[</span><span class="s1">&#39; return (&#39;</span> <span class="o">+</span> <span class="n">expression</span> <span class="o">+</span> <span class="s1">&#39;)&#39;</span><span class="p">])</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">source</span> <span class="o">=</span> <span class="nb">callable</span>
<span class="k">return</span> <span class="n">python_callable</span><span class="o">.</span><span class="n">PythonCallableWithSource</span><span class="p">(</span><span class="n">source</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_validator</span><span class="p">(</span><span class="n">beam_type</span><span class="p">:</span> <span class="n">schema_pb2</span><span class="o">.</span><span class="n">FieldType</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">Any</span><span class="p">],</span> <span class="nb">bool</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns a callable converting rows of the given type to Json objects.&quot;&quot;&quot;</span>
<span class="n">type_info</span> <span class="o">=</span> <span class="n">beam_type</span><span class="o">.</span><span class="n">WhichOneof</span><span class="p">(</span><span class="s2">&quot;type_info&quot;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">type_info</span> <span class="o">==</span> <span class="s2">&quot;atomic_type&quot;</span><span class="p">:</span>
<span class="k">if</span> <span class="n">beam_type</span><span class="o">.</span><span class="n">atomic_type</span> <span class="o">==</span> <span class="n">schema_pb2</span><span class="o">.</span><span class="n">BOOLEAN</span><span class="p">:</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="nb">bool</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">beam_type</span><span class="o">.</span><span class="n">atomic_type</span> <span class="o">==</span> <span class="n">schema_pb2</span><span class="o">.</span><span class="n">INT64</span><span class="p">:</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="nb">int</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">beam_type</span><span class="o">.</span><span class="n">atomic_type</span> <span class="o">==</span> <span class="n">schema_pb2</span><span class="o">.</span><span class="n">DOUBLE</span><span class="p">:</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="nb">float</span><span class="p">))</span>
<span class="k">elif</span> <span class="n">beam_type</span><span class="o">.</span><span class="n">atomic_type</span> <span class="o">==</span> <span class="n">schema_pb2</span><span class="o">.</span><span class="n">STRING</span><span class="p">:</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="sa">f</span><span class="s1">&#39;Unknown or unsupported atomic type: </span><span class="si">{</span><span class="n">beam_type</span><span class="o">.</span><span class="n">atomic_type</span><span class="si">}</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">type_info</span> <span class="o">==</span> <span class="s2">&quot;array_type&quot;</span><span class="p">:</span>
<span class="n">element_validator</span> <span class="o">=</span> <span class="n">_validator</span><span class="p">(</span><span class="n">beam_type</span><span class="o">.</span><span class="n">array_type</span><span class="o">.</span><span class="n">element_type</span><span class="p">)</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">value</span><span class="p">:</span> <span class="nb">all</span><span class="p">(</span><span class="n">element_validator</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">value</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">type_info</span> <span class="o">==</span> <span class="s2">&quot;iterable_type&quot;</span><span class="p">:</span>
<span class="n">element_validator</span> <span class="o">=</span> <span class="n">_validator</span><span class="p">(</span><span class="n">beam_type</span><span class="o">.</span><span class="n">iterable_type</span><span class="o">.</span><span class="n">element_type</span><span class="p">)</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">value</span><span class="p">:</span> <span class="nb">all</span><span class="p">(</span><span class="n">element_validator</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">value</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">type_info</span> <span class="o">==</span> <span class="s2">&quot;map_type&quot;</span><span class="p">:</span>
<span class="n">key_validator</span> <span class="o">=</span> <span class="n">_validator</span><span class="p">(</span><span class="n">beam_type</span><span class="o">.</span><span class="n">map_type</span><span class="o">.</span><span class="n">key_type</span><span class="p">)</span>
<span class="n">value_validator</span> <span class="o">=</span> <span class="n">_validator</span><span class="p">(</span><span class="n">beam_type</span><span class="o">.</span><span class="n">map_type</span><span class="o">.</span><span class="n">value_type</span><span class="p">)</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">value</span><span class="p">:</span> <span class="nb">all</span><span class="p">(</span>
<span class="n">key_validator</span><span class="p">(</span><span class="n">k</span><span class="p">)</span> <span class="ow">and</span> <span class="n">value_validator</span><span class="p">(</span><span class="n">v</span><span class="p">)</span> <span class="k">for</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">)</span> <span class="ow">in</span> <span class="n">value</span><span class="o">.</span><span class="n">items</span><span class="p">())</span>
<span class="k">elif</span> <span class="n">type_info</span> <span class="o">==</span> <span class="s2">&quot;row_type&quot;</span><span class="p">:</span>
<span class="n">validators</span> <span class="o">=</span> <span class="p">{</span>
<span class="n">field</span><span class="o">.</span><span class="n">name</span><span class="p">:</span> <span class="n">_validator</span><span class="p">(</span><span class="n">field</span><span class="o">.</span><span class="n">type</span><span class="p">)</span>
<span class="k">for</span> <span class="n">field</span> <span class="ow">in</span> <span class="n">beam_type</span><span class="o">.</span><span class="n">row_type</span><span class="o">.</span><span class="n">schema</span><span class="o">.</span><span class="n">fields</span>
<span class="p">}</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">row</span><span class="p">:</span> <span class="nb">all</span><span class="p">(</span>
<span class="n">validator</span><span class="p">(</span><span class="nb">getattr</span><span class="p">(</span><span class="n">row</span><span class="p">,</span> <span class="n">name</span><span class="p">))</span>
<span class="k">for</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">validator</span><span class="p">)</span> <span class="ow">in</span> <span class="n">validators</span><span class="o">.</span><span class="n">items</span><span class="p">())</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Unrecognized type_info: </span><span class="si">{</span><span class="n">type_info</span><span class="si">!r}</span><span class="s2">&quot;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_as_callable_for_pcoll</span><span class="p">(</span>
<span class="n">pcoll</span><span class="p">,</span>
<span class="n">fn_spec</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]],</span>
<span class="n">msg</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">language</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]):</span>
<span class="k">if</span> <span class="n">language</span> <span class="o">==</span> <span class="s1">&#39;javascript&#39;</span><span class="p">:</span>
<span class="n">options</span><span class="o">.</span><span class="n">YamlOptions</span><span class="o">.</span><span class="n">check_enabled</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="p">,</span> <span class="s1">&#39;javascript&#39;</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">input_schema</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">named_fields_from_element_type</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">))</span>
<span class="k">except</span> <span class="p">(</span><span class="ne">TypeError</span><span class="p">,</span> <span class="ne">ValueError</span><span class="p">)</span> <span class="k">as</span> <span class="n">exn</span><span class="p">:</span>
<span class="k">if</span> <span class="n">is_expr</span><span class="p">(</span><span class="n">fn_spec</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Can only use expressions on a schema&#39;d input.&quot;</span><span class="p">)</span> <span class="kn">from</span> <span class="nn">exn</span>
<span class="n">input_schema</span> <span class="o">=</span> <span class="p">{}</span> <span class="c1"># unused</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fn_spec</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span> <span class="ow">and</span> <span class="n">fn_spec</span> <span class="ow">in</span> <span class="n">input_schema</span><span class="p">:</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">row</span><span class="p">:</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">row</span><span class="p">,</span> <span class="n">fn_spec</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">_as_callable</span><span class="p">(</span><span class="nb">list</span><span class="p">(</span><span class="n">input_schema</span><span class="o">.</span><span class="n">keys</span><span class="p">()),</span> <span class="n">fn_spec</span><span class="p">,</span> <span class="n">msg</span><span class="p">,</span> <span class="n">language</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_as_callable</span><span class="p">(</span><span class="n">original_fields</span><span class="p">,</span> <span class="n">expr</span><span class="p">,</span> <span class="n">transform_name</span><span class="p">,</span> <span class="n">language</span><span class="p">):</span>
<span class="k">if</span> <span class="n">expr</span> <span class="ow">in</span> <span class="n">original_fields</span><span class="p">:</span>
<span class="k">return</span> <span class="n">expr</span>
<span class="c1"># TODO(yaml): support an imports parameter</span>
<span class="c1"># TODO(yaml): support a requirements parameter (possibly at a higher level)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expr</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="n">expr</span> <span class="o">=</span> <span class="p">{</span><span class="s1">&#39;expression&#39;</span><span class="p">:</span> <span class="n">expr</span><span class="p">}</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expr</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="sa">f</span><span class="s2">&quot;Ambiguous expression type (perhaps missing quoting?): </span><span class="si">{</span><span class="n">expr</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>
<span class="n">explicit_type</span> <span class="o">=</span> <span class="n">expr</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">&#39;output_type&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="n">_check_mapping_arguments</span><span class="p">(</span><span class="n">transform_name</span><span class="p">,</span> <span class="o">**</span><span class="n">expr</span><span class="p">)</span>
<span class="k">if</span> <span class="n">language</span> <span class="o">==</span> <span class="s2">&quot;javascript&quot;</span><span class="p">:</span>
<span class="n">func</span> <span class="o">=</span> <span class="n">_expand_javascript_mapping_func</span><span class="p">(</span><span class="n">original_fields</span><span class="p">,</span> <span class="o">**</span><span class="n">expr</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">language</span> <span class="o">==</span> <span class="s2">&quot;python&quot;</span><span class="p">:</span>
<span class="n">func</span> <span class="o">=</span> <span class="n">_expand_python_mapping_func</span><span class="p">(</span><span class="n">original_fields</span><span class="p">,</span> <span class="o">**</span><span class="n">expr</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="sa">f</span><span class="s1">&#39;Unknown language for mapping transform: </span><span class="si">{</span><span class="n">language</span><span class="si">}</span><span class="s1">. &#39;</span>
<span class="s1">&#39;Supported languages are &quot;javascript&quot; and &quot;python.&quot;&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">explicit_type</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">explicit_type</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="n">explicit_type</span> <span class="o">=</span> <span class="p">{</span><span class="s1">&#39;type&#39;</span><span class="p">:</span> <span class="n">explicit_type</span><span class="p">}</span>
<span class="n">beam_type</span> <span class="o">=</span> <span class="n">json_utils</span><span class="o">.</span><span class="n">json_type_to_beam_type</span><span class="p">(</span><span class="n">explicit_type</span><span class="p">)</span>
<span class="n">validator</span> <span class="o">=</span> <span class="n">_validator</span><span class="p">(</span><span class="n">beam_type</span><span class="p">)</span>
<span class="nd">@beam</span><span class="o">.</span><span class="n">typehints</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">schemas</span><span class="o">.</span><span class="n">typing_from_runner_api</span><span class="p">(</span><span class="n">beam_type</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">checking_func</span><span class="p">(</span><span class="n">row</span><span class="p">):</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">func</span><span class="p">(</span><span class="n">row</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">validator</span><span class="p">(</span><span class="n">result</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="sa">f</span><span class="s1">&#39;</span><span class="si">{</span><span class="n">result</span><span class="si">}</span><span class="s1"> violates schema </span><span class="si">{</span><span class="n">explicit_type</span><span class="si">}</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="n">result</span>
<span class="k">return</span> <span class="n">checking_func</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">func</span>
<div class="viewcode-block" id="ErrorHandlingConfig"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.ErrorHandlingConfig">[docs]</a><span class="k">class</span> <span class="nc">ErrorHandlingConfig</span><span class="p">(</span><span class="n">NamedTuple</span><span class="p">):</span>
<span class="n">output</span><span class="p">:</span> <span class="nb">str</span></div>
<span class="c1"># TODO: Other parameters are valid here too, but not common to Java.</span>
<div class="viewcode-block" id="exception_handling_args"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.exception_handling_args">[docs]</a><span class="k">def</span> <span class="nf">exception_handling_args</span><span class="p">(</span><span class="n">error_handling_spec</span><span class="p">):</span>
<span class="k">if</span> <span class="n">error_handling_spec</span><span class="p">:</span>
<span class="k">return</span> <span class="p">{</span>
<span class="s1">&#39;dead_letter_tag&#39;</span> <span class="k">if</span> <span class="n">k</span> <span class="o">==</span> <span class="s1">&#39;output&#39;</span> <span class="k">else</span> <span class="n">k</span><span class="p">:</span> <span class="n">v</span>
<span class="k">for</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">)</span> <span class="ow">in</span> <span class="n">error_handling_spec</span><span class="o">.</span><span class="n">items</span><span class="p">()</span>
<span class="p">}</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">None</span></div>
<span class="k">def</span> <span class="nf">_map_errors_to_standard_format</span><span class="p">(</span><span class="n">input_type</span><span class="p">):</span>
<span class="c1"># TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple.</span>
<span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">Row</span><span class="p">(</span><span class="n">element</span><span class="o">=</span><span class="n">x</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">msg</span><span class="o">=</span><span class="nb">str</span><span class="p">(</span><span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">][</span><span class="mi">1</span><span class="p">]),</span> <span class="n">stack</span><span class="o">=</span><span class="nb">str</span><span class="p">(</span><span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">][</span><span class="mi">2</span><span class="p">]))</span>
<span class="p">)</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span>
<span class="n">RowTypeConstraint</span><span class="o">.</span><span class="n">from_fields</span><span class="p">([(</span><span class="s2">&quot;element&quot;</span><span class="p">,</span> <span class="n">input_type</span><span class="p">),</span> <span class="p">(</span><span class="s2">&quot;msg&quot;</span><span class="p">,</span> <span class="nb">str</span><span class="p">),</span>
<span class="p">(</span><span class="s2">&quot;stack&quot;</span><span class="p">,</span> <span class="nb">str</span><span class="p">)]))</span>
<div class="viewcode-block" id="maybe_with_exception_handling"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.maybe_with_exception_handling">[docs]</a><span class="k">def</span> <span class="nf">maybe_with_exception_handling</span><span class="p">(</span><span class="n">inner_expand</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="n">wrapped_pcoll</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">core</span><span class="o">.</span><span class="n">_MaybePValueWithErrors</span><span class="p">(</span>
<span class="n">pcoll</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_exception_handling_args</span><span class="p">)</span>
<span class="k">return</span> <span class="n">inner_expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">wrapped_pcoll</span><span class="p">)</span><span class="o">.</span><span class="n">as_result</span><span class="p">(</span>
<span class="n">_map_errors_to_standard_format</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">))</span>
<span class="k">return</span> <span class="n">expand</span></div>
<div class="viewcode-block" id="maybe_with_exception_handling_transform_fn"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.maybe_with_exception_handling_transform_fn">[docs]</a><span class="k">def</span> <span class="nf">maybe_with_exception_handling_transform_fn</span><span class="p">(</span><span class="n">transform_fn</span><span class="p">):</span>
<span class="nd">@functools</span><span class="o">.</span><span class="n">wraps</span><span class="p">(</span><span class="n">transform_fn</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">error_handling</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="n">wrapped_pcoll</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">core</span><span class="o">.</span><span class="n">_MaybePValueWithErrors</span><span class="p">(</span>
<span class="n">pcoll</span><span class="p">,</span> <span class="n">exception_handling_args</span><span class="p">(</span><span class="n">error_handling</span><span class="p">))</span>
<span class="k">return</span> <span class="n">transform_fn</span><span class="p">(</span><span class="n">wrapped_pcoll</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span><span class="o">.</span><span class="n">as_result</span><span class="p">(</span>
<span class="n">_map_errors_to_standard_format</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">))</span>
<span class="n">original_signature</span> <span class="o">=</span> <span class="n">inspect</span><span class="o">.</span><span class="n">signature</span><span class="p">(</span><span class="n">transform_fn</span><span class="p">)</span>
<span class="n">new_parameters</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">original_signature</span><span class="o">.</span><span class="n">parameters</span><span class="o">.</span><span class="n">values</span><span class="p">())</span>
<span class="n">error_handling_param</span> <span class="o">=</span> <span class="n">inspect</span><span class="o">.</span><span class="n">Parameter</span><span class="p">(</span>
<span class="s1">&#39;error_handling&#39;</span><span class="p">,</span>
<span class="n">inspect</span><span class="o">.</span><span class="n">Parameter</span><span class="o">.</span><span class="n">KEYWORD_ONLY</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">annotation</span><span class="o">=</span><span class="n">ErrorHandlingConfig</span><span class="p">)</span>
<span class="k">if</span> <span class="n">new_parameters</span><span class="p">[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">kind</span> <span class="o">==</span> <span class="n">inspect</span><span class="o">.</span><span class="n">Parameter</span><span class="o">.</span><span class="n">VAR_KEYWORD</span><span class="p">:</span>
<span class="n">new_parameters</span><span class="o">.</span><span class="n">insert</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">,</span> <span class="n">error_handling_param</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">new_parameters</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">error_handling_param</span><span class="p">)</span>
<span class="n">expand</span><span class="o">.</span><span class="n">__signature__</span> <span class="o">=</span> <span class="n">original_signature</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="n">parameters</span><span class="o">=</span><span class="n">new_parameters</span><span class="p">)</span>
<span class="k">return</span> <span class="n">expand</span></div>
<span class="k">class</span> <span class="nc">_Explode</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Explodes (aka unnest/flatten) one or more fields producing multiple rows.</span>
<span class="sd"> Given one or more fields of iterable type, produces multiple rows, one for</span>
<span class="sd"> each value of that field. For example, a row of the form `(&#39;a&#39;, [1, 2, 3])`</span>
<span class="sd"> would expand to `(&#39;a&#39;, 1)`, `(&#39;a&#39;, 2&#39;)`, and `(&#39;a&#39;, 3)` when exploded on</span>
<span class="sd"> the second field.</span>
<span class="sd"> This is akin to a `FlatMap` when paired with the MapToFields transform.</span>
<span class="sd"> Args:</span>
<span class="sd"> fields: The list of fields to expand.</span>
<span class="sd"> cross_product: If multiple fields are specified, indicates whether the</span>
<span class="sd"> full cross-product of combinations should be produced, or if the</span>
<span class="sd"> first element of the first field corresponds to the first element</span>
<span class="sd"> of the second field, etc. For example, the row</span>
<span class="sd"> `([&#39;a&#39;, &#39;b&#39;], [1, 2])` would expand to the four rows</span>
<span class="sd"> `(&#39;a&#39;, 1)`, `(&#39;a&#39;, 2)`, `(&#39;b&#39;, 1)`, and `(&#39;b&#39;, 2)` when</span>
<span class="sd"> `cross_product` is set to `true` but only the two rows</span>
<span class="sd"> `(&#39;a&#39;, 1)` and `(&#39;b&#39;, 2)` when it is set to `false`.</span>
<span class="sd"> Only meaningful (and required) if multiple rows are specified.</span>
<span class="sd"> error_handling: Whether and how to handle errors during iteration.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">fields</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Collection</span><span class="p">[</span><span class="nb">str</span><span class="p">]],</span>
<span class="n">cross_product</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">error_handling</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Mapping</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fields</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="n">fields</span> <span class="o">=</span> <span class="p">[</span><span class="n">fields</span><span class="p">]</span>
<span class="k">if</span> <span class="n">cross_product</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">fields</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">1</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;cross_product must be specified true or false &#39;</span>
<span class="s1">&#39;when exploding multiple fields&#39;</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># Doesn&#39;t matter.</span>
<span class="n">cross_product</span> <span class="o">=</span> <span class="kc">True</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_fields</span> <span class="o">=</span> <span class="n">fields</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_cross_product</span> <span class="o">=</span> <span class="n">cross_product</span>
<span class="c1"># TODO(yaml): Support standard error handling argument.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_exception_handling_args</span> <span class="o">=</span> <span class="n">exception_handling_args</span><span class="p">(</span><span class="n">error_handling</span><span class="p">)</span>
<span class="nd">@maybe_with_exception_handling</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="n">all_fields</span> <span class="o">=</span> <span class="p">[</span>
<span class="n">x</span> <span class="k">for</span> <span class="n">x</span><span class="p">,</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">named_fields_from_element_type</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">)</span>
<span class="p">]</span>
<span class="k">for</span> <span class="n">field</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fields</span><span class="p">:</span>
<span class="k">if</span> <span class="n">field</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">all_fields</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s1">&#39;Exploding unknown field &quot;</span><span class="si">{</span><span class="n">field</span><span class="si">}</span><span class="s1">&quot;&#39;</span><span class="p">)</span>
<span class="n">to_explode</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fields</span>
<span class="k">def</span> <span class="nf">explode_cross_product</span><span class="p">(</span><span class="n">base</span><span class="p">,</span> <span class="n">fields</span><span class="p">):</span>
<span class="k">if</span> <span class="n">fields</span><span class="p">:</span>
<span class="n">copy</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">base</span><span class="p">)</span>
<span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">base</span><span class="p">[</span><span class="n">fields</span><span class="p">[</span><span class="mi">0</span><span class="p">]]:</span>
<span class="n">copy</span><span class="p">[</span><span class="n">fields</span><span class="p">[</span><span class="mi">0</span><span class="p">]]</span> <span class="o">=</span> <span class="n">value</span>
<span class="k">yield from</span> <span class="n">explode_cross_product</span><span class="p">(</span><span class="n">copy</span><span class="p">,</span> <span class="n">fields</span><span class="p">[</span><span class="mi">1</span><span class="p">:])</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">beam</span><span class="o">.</span><span class="n">Row</span><span class="p">(</span><span class="o">**</span><span class="n">base</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">explode_zip</span><span class="p">(</span><span class="n">base</span><span class="p">,</span> <span class="n">fields</span><span class="p">):</span>
<span class="n">to_zip</span> <span class="o">=</span> <span class="p">[</span><span class="n">base</span><span class="p">[</span><span class="n">field</span><span class="p">]</span> <span class="k">for</span> <span class="n">field</span> <span class="ow">in</span> <span class="n">fields</span><span class="p">]</span>
<span class="n">copy</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">base</span><span class="p">)</span>
<span class="k">for</span> <span class="n">values</span> <span class="ow">in</span> <span class="n">itertools</span><span class="o">.</span><span class="n">zip_longest</span><span class="p">(</span><span class="o">*</span><span class="n">to_zip</span><span class="p">,</span> <span class="n">fillvalue</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">for</span> <span class="n">ix</span><span class="p">,</span> <span class="n">field</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="n">fields</span><span class="p">):</span>
<span class="n">copy</span><span class="p">[</span><span class="n">field</span><span class="p">]</span> <span class="o">=</span> <span class="n">values</span><span class="p">[</span><span class="n">ix</span><span class="p">]</span>
<span class="k">yield</span> <span class="n">beam</span><span class="o">.</span><span class="n">Row</span><span class="p">(</span><span class="o">**</span><span class="n">copy</span><span class="p">)</span>
<span class="n">cross_product</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cross_product</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">pcoll</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">row</span><span class="p">:</span>
<span class="p">(</span><span class="n">explode_cross_product</span> <span class="k">if</span> <span class="n">cross_product</span> <span class="k">else</span> <span class="n">explode_zip</span><span class="p">)</span>
<span class="p">({</span><span class="n">name</span><span class="p">:</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">row</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span>
<span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">all_fields</span><span class="p">},</span> <span class="n">to_explode</span><span class="p">)))</span>
<span class="k">def</span> <span class="nf">infer_output_type</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_type</span><span class="p">):</span>
<span class="k">return</span> <span class="n">row_type</span><span class="o">.</span><span class="n">RowTypeConstraint</span><span class="o">.</span><span class="n">from_fields</span><span class="p">([(</span>
<span class="n">name</span><span class="p">,</span>
<span class="n">trivial_inference</span><span class="o">.</span><span class="n">element_type</span><span class="p">(</span><span class="n">typ</span><span class="p">)</span> <span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_fields</span> <span class="k">else</span>
<span class="n">typ</span><span class="p">)</span> <span class="k">for</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">typ</span><span class="p">)</span> <span class="ow">in</span> <span class="n">named_fields_from_element_type</span><span class="p">(</span><span class="n">input_type</span><span class="p">)])</span>
<span class="k">def</span> <span class="nf">with_exception_handling</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="c1"># It&#39;s possible there&#39;s an error in iteration...</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_exception_handling_args</span> <span class="o">=</span> <span class="n">kwargs</span>
<span class="k">return</span> <span class="bp">self</span>
<span class="nd">@beam</span><span class="o">.</span><span class="n">ptransform</span><span class="o">.</span><span class="n">ptransform_fn</span>
<span class="nd">@maybe_with_exception_handling_transform_fn</span>
<span class="k">def</span> <span class="nf">_PyJsFilter</span><span class="p">(</span>
<span class="n">pcoll</span><span class="p">,</span> <span class="n">keep</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]],</span> <span class="n">language</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">):</span>
<span class="n">keep_fn</span> <span class="o">=</span> <span class="n">_as_callable_for_pcoll</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">keep</span><span class="p">,</span> <span class="s2">&quot;keep&quot;</span><span class="p">,</span> <span class="n">language</span><span class="p">)</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Filter</span><span class="p">(</span><span class="n">keep_fn</span><span class="p">)</span>
<div class="viewcode-block" id="is_expr"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.is_expr">[docs]</a><span class="k">def</span> <span class="nf">is_expr</span><span class="p">(</span><span class="n">v</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span> <span class="ow">or</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">dict</span><span class="p">)</span> <span class="ow">and</span> <span class="s1">&#39;expression&#39;</span> <span class="ow">in</span> <span class="n">v</span><span class="p">)</span></div>
<div class="viewcode-block" id="normalize_fields"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.normalize_fields">[docs]</a><span class="k">def</span> <span class="nf">normalize_fields</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">fields</span><span class="p">,</span> <span class="n">drop</span><span class="o">=</span><span class="p">(),</span> <span class="n">append</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">language</span><span class="o">=</span><span class="s1">&#39;generic&#39;</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">input_schema</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">named_fields_from_element_type</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">element_type</span><span class="p">))</span>
<span class="k">except</span> <span class="p">(</span><span class="ne">TypeError</span><span class="p">,</span> <span class="ne">ValueError</span><span class="p">)</span> <span class="k">as</span> <span class="n">exn</span><span class="p">:</span>
<span class="k">if</span> <span class="n">drop</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Can only drop fields on a schema&#39;d input.&quot;</span><span class="p">)</span> <span class="kn">from</span> <span class="nn">exn</span>
<span class="k">if</span> <span class="n">append</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Can only append fields on a schema&#39;d input.&quot;</span><span class="p">)</span> <span class="kn">from</span> <span class="nn">exn</span>
<span class="k">elif</span> <span class="nb">any</span><span class="p">(</span><span class="n">is_expr</span><span class="p">(</span><span class="n">x</span><span class="p">)</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">fields</span><span class="o">.</span><span class="n">values</span><span class="p">()):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Can only use expressions on a schema&#39;d input.&quot;</span><span class="p">)</span> <span class="kn">from</span> <span class="nn">exn</span>
<span class="n">input_schema</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">if</span> <span class="n">drop</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">append</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Can only drop fields if append is true.&quot;</span><span class="p">)</span>
<span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">drop</span><span class="p">:</span>
<span class="k">if</span> <span class="n">name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">input_schema</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s1">&#39;Dropping unknown field &quot;</span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1">&quot;&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">append</span><span class="p">:</span>
<span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">fields</span><span class="p">:</span>
<span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">input_schema</span> <span class="ow">and</span> <span class="n">name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">drop</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="sa">f</span><span class="s1">&#39;Redefinition of field &quot;</span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1">&quot;. &#39;</span>
<span class="s1">&#39;Cannot append a field that already exists in original input.&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">language</span> <span class="o">==</span> <span class="s1">&#39;generic&#39;</span><span class="p">:</span>
<span class="k">for</span> <span class="n">expr</span> <span class="ow">in</span> <span class="n">fields</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expr</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;Missing language specification. &quot;</span>
<span class="s2">&quot;Must specify a language when using a map with custom logic.&quot;</span><span class="p">)</span>
<span class="n">missing</span> <span class="o">=</span> <span class="nb">set</span><span class="p">(</span><span class="n">fields</span><span class="o">.</span><span class="n">values</span><span class="p">())</span> <span class="o">-</span> <span class="nb">set</span><span class="p">(</span><span class="n">input_schema</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span>
<span class="k">if</span> <span class="n">missing</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="sa">f</span><span class="s2">&quot;Missing language specification or unknown input fields: </span><span class="si">{</span><span class="n">missing</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">append</span><span class="p">:</span>
<span class="k">return</span> <span class="n">input_schema</span><span class="p">,</span> <span class="p">{</span>
<span class="o">**</span><span class="p">{</span><span class="n">name</span><span class="p">:</span> <span class="n">name</span>
<span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">input_schema</span><span class="o">.</span><span class="n">keys</span><span class="p">()</span> <span class="k">if</span> <span class="n">name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">drop</span><span class="p">},</span>
<span class="o">**</span><span class="n">fields</span>
<span class="p">}</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">input_schema</span><span class="p">,</span> <span class="n">fields</span></div>
<span class="nd">@beam</span><span class="o">.</span><span class="n">ptransform</span><span class="o">.</span><span class="n">ptransform_fn</span>
<span class="nd">@maybe_with_exception_handling_transform_fn</span>
<span class="k">def</span> <span class="nf">_PyJsMapToFields</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">language</span><span class="o">=</span><span class="s1">&#39;generic&#39;</span><span class="p">,</span> <span class="o">**</span><span class="n">mapping_args</span><span class="p">):</span>
<span class="n">input_schema</span><span class="p">,</span> <span class="n">fields</span> <span class="o">=</span> <span class="n">normalize_fields</span><span class="p">(</span>
<span class="n">pcoll</span><span class="p">,</span> <span class="n">language</span><span class="o">=</span><span class="n">language</span><span class="p">,</span> <span class="o">**</span><span class="n">mapping_args</span><span class="p">)</span>
<span class="k">if</span> <span class="n">language</span> <span class="o">==</span> <span class="s1">&#39;javascript&#39;</span><span class="p">:</span>
<span class="n">options</span><span class="o">.</span><span class="n">YamlOptions</span><span class="o">.</span><span class="n">check_enabled</span><span class="p">(</span><span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span><span class="p">,</span> <span class="s1">&#39;javascript&#39;</span><span class="p">)</span>
<span class="n">original_fields</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">input_schema</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Select</span><span class="p">(</span>
<span class="o">**</span><span class="p">{</span>
<span class="n">name</span><span class="p">:</span> <span class="n">_as_callable</span><span class="p">(</span><span class="n">original_fields</span><span class="p">,</span> <span class="n">expr</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">language</span><span class="p">)</span>
<span class="k">for</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">expr</span><span class="p">)</span> <span class="ow">in</span> <span class="n">fields</span><span class="o">.</span><span class="n">items</span><span class="p">()</span>
<span class="p">})</span>
<span class="nd">@beam</span><span class="o">.</span><span class="n">ptransform</span><span class="o">.</span><span class="n">ptransform_fn</span>
<span class="k">def</span> <span class="nf">_SqlFilterTransform</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">sql_transform_constructor</span><span class="p">,</span> <span class="n">keep</span><span class="p">,</span> <span class="n">language</span><span class="p">):</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">sql_transform_constructor</span><span class="p">(</span>
<span class="sa">f</span><span class="s1">&#39;SELECT * FROM PCOLLECTION WHERE </span><span class="si">{</span><span class="n">keep</span><span class="si">}</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="nd">@beam</span><span class="o">.</span><span class="n">ptransform</span><span class="o">.</span><span class="n">ptransform_fn</span>
<span class="k">def</span> <span class="nf">_SqlMapToFieldsTransform</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">sql_transform_constructor</span><span class="p">,</span> <span class="o">**</span><span class="n">mapping_args</span><span class="p">):</span>
<span class="n">_</span><span class="p">,</span> <span class="n">fields</span> <span class="o">=</span> <span class="n">normalize_fields</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="o">**</span><span class="n">mapping_args</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">extract_expr</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">v</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">return</span> <span class="n">v</span>
<span class="k">elif</span> <span class="s1">&#39;expression&#39;</span> <span class="ow">in</span> <span class="n">v</span><span class="p">:</span>
<span class="k">return</span> <span class="n">v</span><span class="p">[</span><span class="s1">&#39;expression&#39;</span><span class="p">]</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Only expressions allowed in SQL at </span><span class="si">{name}</span><span class="s2">.&quot;</span><span class="p">)</span>
<span class="n">selects</span> <span class="o">=</span> <span class="p">[</span>
<span class="sa">f</span><span class="s1">&#39;(</span><span class="si">{</span><span class="n">extract_expr</span><span class="p">(</span><span class="n">name</span><span class="p">,</span><span class="w"> </span><span class="n">expr</span><span class="p">)</span><span class="si">}</span><span class="s1">) AS </span><span class="si">{</span><span class="n">name</span><span class="si">}</span><span class="s1">&#39;</span>
<span class="k">for</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">expr</span><span class="p">)</span> <span class="ow">in</span> <span class="n">fields</span><span class="o">.</span><span class="n">items</span><span class="p">()</span>
<span class="p">]</span>
<span class="n">query</span> <span class="o">=</span> <span class="s2">&quot;SELECT &quot;</span> <span class="o">+</span> <span class="s2">&quot;, &quot;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">selects</span><span class="p">)</span> <span class="o">+</span> <span class="s2">&quot; FROM PCOLLECTION&quot;</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">sql_transform_constructor</span><span class="p">(</span><span class="n">query</span><span class="p">)</span>
<span class="nd">@beam</span><span class="o">.</span><span class="n">ptransform</span><span class="o">.</span><span class="n">ptransform_fn</span>
<span class="k">def</span> <span class="nf">_AssignTimestamps</span><span class="p">(</span>
<span class="n">pcoll</span><span class="p">,</span>
<span class="n">timestamp</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]],</span>
<span class="n">language</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">):</span>
<span class="n">timestamp_fn</span> <span class="o">=</span> <span class="n">_as_callable_for_pcoll</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">,</span> <span class="s1">&#39;timestamp&#39;</span><span class="p">,</span> <span class="n">language</span><span class="p">)</span>
<span class="n">T</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s1">&#39;T&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">TimestampedValue</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">timestamp_fn</span><span class="p">(</span><span class="n">x</span><span class="p">))</span>
<span class="p">)</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="n">T</span><span class="p">)</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="n">T</span><span class="p">)</span>
<div class="viewcode-block" id="create_mapping_providers"><a class="viewcode-back" href="../../../apache_beam.yaml.yaml_mapping.html#apache_beam.yaml.yaml_mapping.create_mapping_providers">[docs]</a><span class="k">def</span> <span class="nf">create_mapping_providers</span><span class="p">():</span>
<span class="c1"># These are MetaInlineProviders because their expansion is in terms of other</span>
<span class="c1"># YamlTransforms, but in a way that needs to be deferred until the input</span>
<span class="c1"># schema is known.</span>
<span class="k">return</span> <span class="p">[</span>
<span class="n">yaml_provider</span><span class="o">.</span><span class="n">InlineProvider</span><span class="p">({</span>
<span class="s1">&#39;AssignTimestamps-python&#39;</span><span class="p">:</span> <span class="n">_AssignTimestamps</span><span class="p">,</span>
<span class="s1">&#39;AssignTimestamps-javascript&#39;</span><span class="p">:</span> <span class="n">_AssignTimestamps</span><span class="p">,</span>
<span class="s1">&#39;AssignTimestamps-generic&#39;</span><span class="p">:</span> <span class="n">_AssignTimestamps</span><span class="p">,</span>
<span class="s1">&#39;Explode&#39;</span><span class="p">:</span> <span class="n">_Explode</span><span class="p">,</span>
<span class="s1">&#39;Filter-python&#39;</span><span class="p">:</span> <span class="n">_PyJsFilter</span><span class="p">,</span>
<span class="s1">&#39;Filter-javascript&#39;</span><span class="p">:</span> <span class="n">_PyJsFilter</span><span class="p">,</span>
<span class="s1">&#39;MapToFields-python&#39;</span><span class="p">:</span> <span class="n">_PyJsMapToFields</span><span class="p">,</span>
<span class="s1">&#39;MapToFields-javascript&#39;</span><span class="p">:</span> <span class="n">_PyJsMapToFields</span><span class="p">,</span>
<span class="s1">&#39;MapToFields-generic&#39;</span><span class="p">:</span> <span class="n">_PyJsMapToFields</span><span class="p">,</span>
<span class="p">}),</span>
<span class="n">yaml_provider</span><span class="o">.</span><span class="n">SqlBackedProvider</span><span class="p">({</span>
<span class="s1">&#39;Filter-sql&#39;</span><span class="p">:</span> <span class="n">_SqlFilterTransform</span><span class="p">,</span>
<span class="s1">&#39;Filter-calcite&#39;</span><span class="p">:</span> <span class="n">_SqlFilterTransform</span><span class="p">,</span>
<span class="s1">&#39;MapToFields-sql&#39;</span><span class="p">:</span> <span class="n">_SqlMapToFieldsTransform</span><span class="p">,</span>
<span class="s1">&#39;MapToFields-calcite&#39;</span><span class="p">:</span> <span class="n">_SqlMapToFieldsTransform</span><span class="p">,</span>
<span class="p">}),</span>
<span class="p">]</span></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>