blob: eb1806d8cc58fb022e2a5a64324dcaeceb3f7f07 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>Python User-defined Table Functions (UDTFs) &#8212; PySpark 3.5.2 documentation</title>
<link href="../../_static/styles/theme.css?digest=1999514e3f237ded88cf" rel="stylesheet">
<link href="../../_static/styles/pydata-sphinx-theme.css?digest=1999514e3f237ded88cf" rel="stylesheet">
<link rel="stylesheet"
href="../../_static/vendor/fontawesome/5.13.0/css/all.min.css">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../_static/vendor/fontawesome/5.13.0/webfonts/fa-solid-900.woff2">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../_static/vendor/fontawesome/5.13.0/webfonts/fa-brands-400.woff2">
<link rel="stylesheet" href="../../_static/styles/pydata-sphinx-theme.css" type="text/css" />
<link rel="stylesheet" href="../../_static/pygments.css" type="text/css" />
<link rel="stylesheet" type="text/css" href="../../_static/copybutton.css" />
<link rel="stylesheet" type="text/css" href="../../_static/css/pyspark.css" />
<link rel="preload" as="script" href="../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf">
<script id="documentation_options" data-url_root="../../" src="../../_static/documentation_options.js"></script>
<script src="../../_static/jquery.js"></script>
<script src="../../_static/underscore.js"></script>
<script src="../../_static/doctools.js"></script>
<script src="../../_static/language_data.js"></script>
<script src="../../_static/clipboard.min.js"></script>
<script src="../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script async="async" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/x-mathjax-config">MathJax.Hub.Config({"tex2jax": {"inlineMath": [["$", "$"], ["\\(", "\\)"]], "processEscapes": true, "ignoreClass": "document", "processClass": "math|output_area"}})</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/user_guide/sql/python_udtf.html" />
<link rel="search" title="Search" href="../../search.html" />
<link rel="next" title="Pandas API on Spark" href="../pandas_on_spark/index.html" />
<link rel="prev" title="Apache Arrow in PySpark" href="arrow_pandas.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="None">
<!-- Google Analytics -->
</head>
<body data-spy="scroll" data-target="#bd-toc-nav" data-offset="80">
<div class="container-fluid" id="banner"></div>
<nav class="navbar navbar-light navbar-expand-lg bg-light fixed-top bd-navbar" id="navbar-main"><div class="container-xl">
<div id="navbar-start">
<a class="navbar-brand" href="../../index.html">
<img src="../../_static/spark-logo-reverse.png" class="logo" alt="logo">
</a>
</div>
<button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbar-collapsible" aria-controls="navbar-collapsible" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div id="navbar-collapsible" class="col-lg-9 collapse navbar-collapse">
<div id="navbar-center" class="mr-auto">
<div class="navbar-center-item">
<ul id="navbar-main-elements" class="navbar-nav">
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../index.html">
Overview
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="toctree-l1 current active nav-item">
<a class="reference internal nav-link" href="../index.html">
User Guides
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../reference/index.html">
API Reference
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../development/index.html">
Development
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</div>
</div>
<div id="navbar-end">
<div class="navbar-end-item">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
3.5.2
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "user_guide/sql/python_udtf.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script>
</div>
</div>
</div>
</div>
</nav>
<div class="container-xl">
<div class="row">
<!-- Only show if we have sidebars configured, else just a small margin -->
<div class="col-12 col-md-3 bd-sidebar">
<div class="sidebar-start-items"><form class="bd-search d-flex align-items-center" action="../../search.html" method="get">
<i class="icon fas fa-search"></i>
<input type="search" class="form-control" name="q" id="search-input" placeholder="Search the docs ..." aria-label="Search the docs ..." autocomplete="off" >
</form><nav class="bd-links" id="bd-docs-nav" aria-label="Main navigation">
<div class="bd-toc-item active">
<ul class="current nav bd-sidenav">
<li class="toctree-l1">
<a class="reference internal" href="../python_packaging.html">
Python Package Management
</a>
</li>
<li class="toctree-l1 current active has-children">
<a class="reference internal" href="index.html">
Spark SQL
</a>
<input checked="" class="toctree-checkbox" id="toctree-checkbox-1" name="toctree-checkbox-1" type="checkbox"/>
<label for="toctree-checkbox-1">
<i class="fas fa-chevron-down">
</i>
</label>
<ul class="current">
<li class="toctree-l2">
<a class="reference internal" href="arrow_pandas.html">
Apache Arrow in PySpark
</a>
</li>
<li class="toctree-l2 current active">
<a class="current reference internal" href="#">
Python User-defined Table Functions (UDTFs)
</a>
</li>
</ul>
</li>
<li class="toctree-l1 has-children">
<a class="reference internal" href="../pandas_on_spark/index.html">
Pandas API on Spark
</a>
<input class="toctree-checkbox" id="toctree-checkbox-2" name="toctree-checkbox-2" type="checkbox"/>
<label for="toctree-checkbox-2">
<i class="fas fa-chevron-down">
</i>
</label>
<ul>
<li class="toctree-l2">
<a class="reference internal" href="../pandas_on_spark/options.html">
Options and settings
</a>
</li>
<li class="toctree-l2">
<a class="reference internal" href="../pandas_on_spark/pandas_pyspark.html">
From/to pandas and PySpark DataFrames
</a>
</li>
<li class="toctree-l2">
<a class="reference internal" href="../pandas_on_spark/transform_apply.html">
Transform and apply a function
</a>
</li>
<li class="toctree-l2">
<a class="reference internal" href="../pandas_on_spark/types.html">
Type Support in Pandas API on Spark
</a>
</li>
<li class="toctree-l2">
<a class="reference internal" href="../pandas_on_spark/typehints.html">
Type Hints in Pandas API on Spark
</a>
</li>
<li class="toctree-l2">
<a class="reference internal" href="../pandas_on_spark/from_to_dbms.html">
From/to other DBMSes
</a>
</li>
<li class="toctree-l2">
<a class="reference internal" href="../pandas_on_spark/best_practices.html">
Best Practices
</a>
</li>
<li class="toctree-l2">
<a class="reference internal" href="../pandas_on_spark/supported_pandas_api.html">
Supported pandas API
</a>
</li>
<li class="toctree-l2">
<a class="reference internal" href="../pandas_on_spark/faq.html">
FAQ
</a>
</li>
</ul>
</li>
</ul>
</div>
</nav>
</div>
<div class="sidebar-end-items">
</div>
</div>
<div class="d-none d-xl-block col-xl-2 bd-toc">
<div class="toc-item">
<div class="tocsection onthispage pt-5 pb-3">
<i class="fas fa-list"></i> On this page
</div>
<nav id="bd-toc-nav">
<ul class="visible nav section-nav flex-column">
<li class="toc-h2 nav-item toc-entry">
<a class="reference internal nav-link" href="#implementing-a-python-udtf">
Implementing a Python UDTF
</a>
</li>
<li class="toc-h2 nav-item toc-entry">
<a class="reference internal nav-link" href="#registering-and-using-python-udtfs-in-sql">
Registering and Using Python UDTFs in SQL
</a>
</li>
<li class="toc-h2 nav-item toc-entry">
<a class="reference internal nav-link" href="#arrow-optimization">
Arrow Optimization
</a>
<ul class="nav section-nav flex-column">
<li class="toc-h3 nav-item toc-entry">
<a class="reference internal nav-link" href="#table-input-argument">
TABLE input argument
</a>
</li>
</ul>
</li>
<li class="toc-h2 nav-item toc-entry">
<a class="reference internal nav-link" href="#more-examples">
More Examples
</a>
</li>
</ul>
</nav>
</div>
<div class="toc-item">
</div>
</div>
<main class="col-12 col-md-9 col-xl-7 py-md-5 pl-md-5 pr-md-4 bd-content" role="main">
<div>
<div class="section" id="python-user-defined-table-functions-udtfs">
<h1>Python User-defined Table Functions (UDTFs)<a class="headerlink" href="#python-user-defined-table-functions-udtfs" title="Permalink to this headline">¶</a></h1>
<p>Spark 3.5 introduces the Python user-defined table function (UDTF), a new type of user-defined function.
Unlike scalar functions that return a single result value from each call, each UDTF is invoked in
the <code class="docutils literal notranslate"><span class="pre">FROM</span></code> clause of a query and returns an entire table as output.
Each UDTF call can accept zero or more arguments.
These arguments can either be scalar expressions or table arguments that represent entire input tables.</p>
<div class="section" id="implementing-a-python-udtf">
<h2>Implementing a Python UDTF<a class="headerlink" href="#implementing-a-python-udtf" title="Permalink to this headline">¶</a></h2>
<p>To implement a Python UDTF, you first need to define a class implementing the methods:</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">class</span> <span class="nc">PythonUDTF</span><span class="p">:</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Initializes the user-defined table function (UDTF). This is optional.</span>
<span class="sd"> This method serves as the default constructor and is called once when the</span>
<span class="sd"> UDTF is instantiated on the executor side.</span>
<span class="sd"> Any class fields assigned in this method will be available for subsequent</span>
<span class="sd"> calls to the `eval` and `terminate` methods. This class instance will remain</span>
<span class="sd"> alive until all rows in the current partition have been consumed by the `eval`</span>
<span class="sd"> method.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> - This method does not accept any extra arguments. Only the default</span>
<span class="sd"> constructor is supported.</span>
<span class="sd"> - You cannot create or reference the Spark session within the UDTF. Any</span>
<span class="sd"> attempt to do so will result in a serialization error.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="o">...</span>
<span class="k">def</span> <span class="nf">eval</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">:</span> <span class="n">Any</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">Any</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Evaluates the function using the given input arguments.</span>
<span class="sd"> This method is required and must be implemented.</span>
<span class="sd"> Argument Mapping:</span>
<span class="sd"> - Each provided scalar expression maps to exactly one value in the</span>
<span class="sd"> `*args` list.</span>
<span class="sd"> - Each provided table argument maps to a pyspark.sql.Row object containing</span>
<span class="sd"> the columns in the order they appear in the provided input table,</span>
<span class="sd"> and with the names computed by the query analyzer.</span>
<span class="sd"> This method is called on every input row, and can produce zero or more</span>
<span class="sd"> output rows. Each element in the output tuple corresponds to one column</span>
<span class="sd"> specified in the return type of the UDTF.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> *args : Any</span>
<span class="sd"> Arbitrary positional arguments representing the input to the UDTF.</span>
<span class="sd"> Yields</span>
<span class="sd"> ------</span>
<span class="sd"> tuple</span>
<span class="sd"> A tuple representing a single row in the UDTF result table.</span>
<span class="sd"> Yield as many times as needed to produce multiple rows.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> - The result of the function must be a tuple representing a single row</span>
<span class="sd"> in the UDTF result table.</span>
<span class="sd"> - UDTFs currently do not accept keyword arguments during the function call.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> eval that returns one row and one column for each input.</span>
<span class="sd"> &gt;&gt;&gt; def eval(self, x: int):</span>
<span class="sd"> ... yield (x, )</span>
<span class="sd"> eval that returns two rows and two columns for each input.</span>
<span class="sd"> &gt;&gt;&gt; def eval(self, x: int, y: int):</span>
<span class="sd"> ... yield (x + y, x - y)</span>
<span class="sd"> ... yield (y + x, y - x)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="o">...</span>
<span class="k">def</span> <span class="nf">terminate</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">Any</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Called when the UDTF has processed all input rows.</span>
<span class="sd"> This method is optional to implement and is useful for performing any</span>
<span class="sd"> cleanup or finalization operations after the UDTF has finished processing</span>
<span class="sd"> all rows. It can also be used to yield additional rows if needed.</span>
<span class="sd"> Table functions that consume all rows in the entire input partition</span>
<span class="sd"> and then compute and return the entire output table can do so from</span>
<span class="sd"> this method as well (please be mindful of memory usage when doing</span>
<span class="sd"> this).</span>
<span class="sd"> Yields</span>
<span class="sd"> ------</span>
<span class="sd"> tuple</span>
<span class="sd"> A tuple representing a single row in the UDTF result table.</span>
<span class="sd"> Yield this if you want to return additional rows during termination.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; def terminate(self) -&gt; Iterator[Any]:</span>
<span class="sd"> &gt;&gt;&gt; yield &quot;done&quot;, None</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="o">...</span>
</pre></div>
</div>
<p>The return type of the UDTF defines the schema of the table it outputs.
It must be either a <code class="docutils literal notranslate"><span class="pre">StructType</span></code>, for example <code class="docutils literal notranslate"><span class="pre">StructType().add(&quot;c1&quot;,</span> <span class="pre">StringType())</span></code>
or a DDL string representing a struct type, for example <code class="docutils literal notranslate"><span class="pre">c1:</span> <span class="pre">string</span></code>.</p>
<p><strong>Example of UDTF Class Implementation</strong></p>
<p>Here is a simple example of a UDTF class implementation:</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1"># Define the UDTF class and implement the required `eval` method.</span>
<span class="k">class</span> <span class="nc">SquareNumbers</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">eval</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">start</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> <span class="n">end</span><span class="p">:</span> <span class="nb">int</span><span class="p">):</span>
<span class="k">for</span> <span class="n">num</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">start</span><span class="p">,</span> <span class="n">end</span> <span class="o">+</span> <span class="mi">1</span><span class="p">):</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">num</span><span class="p">,</span> <span class="n">num</span> <span class="o">*</span> <span class="n">num</span><span class="p">)</span>
</pre></div>
</div>
<p><strong>Instantiating a UDTF with the ``udtf`` Decorator</strong></p>
<p>To make use of the UDTF, you’ll first need to instantiate it using the <code class="docutils literal notranslate"><span class="pre">&#64;udtf</span></code> decorator:</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">lit</span><span class="p">,</span> <span class="n">udtf</span>
<span class="c1"># Create a UDTF using the class definition and the `udtf` function.</span>
<span class="n">square_num</span> <span class="o">=</span> <span class="n">udtf</span><span class="p">(</span><span class="n">SquareNumbers</span><span class="p">,</span> <span class="n">returnType</span><span class="o">=</span><span class="s2">&quot;num: int, squared: int&quot;</span><span class="p">)</span>
<span class="c1"># Invoke the UDTF in PySpark.</span>
<span class="n">square_num</span><span class="p">(</span><span class="n">lit</span><span class="p">(</span><span class="mi">1</span><span class="p">),</span> <span class="n">lit</span><span class="p">(</span><span class="mi">3</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+-------+</span>
<span class="c1"># |num|squared|</span>
<span class="c1"># +---+-------+</span>
<span class="c1"># | 1| 1|</span>
<span class="c1"># | 2| 4|</span>
<span class="c1"># | 3| 9|</span>
<span class="c1"># +---+-------+</span>
</pre></div>
</div>
<p><strong>Instantiating a UDTF with the ``udtf`` Function</strong></p>
<p>An alternative way to create a UDTF is to use the <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.functions.udtf.html#pyspark.sql.functions.udtf" title="pyspark.sql.functions.udtf"><code class="xref py py-func docutils literal notranslate"><span class="pre">udtf()</span></code></a> function:</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">lit</span><span class="p">,</span> <span class="n">udtf</span>
<span class="c1"># Define a UDTF using the `udtf` decorator directly on the class.</span>
<span class="nd">@udtf</span><span class="p">(</span><span class="n">returnType</span><span class="o">=</span><span class="s2">&quot;num: int, squared: int&quot;</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">SquareNumbers</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">eval</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">start</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> <span class="n">end</span><span class="p">:</span> <span class="nb">int</span><span class="p">):</span>
<span class="k">for</span> <span class="n">num</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">start</span><span class="p">,</span> <span class="n">end</span> <span class="o">+</span> <span class="mi">1</span><span class="p">):</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">num</span><span class="p">,</span> <span class="n">num</span> <span class="o">*</span> <span class="n">num</span><span class="p">)</span>
<span class="c1"># Invoke the UDTF in PySpark using the SquareNumbers class directly.</span>
<span class="n">SquareNumbers</span><span class="p">(</span><span class="n">lit</span><span class="p">(</span><span class="mi">1</span><span class="p">),</span> <span class="n">lit</span><span class="p">(</span><span class="mi">3</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+-------+</span>
<span class="c1"># |num|squared|</span>
<span class="c1"># +---+-------+</span>
<span class="c1"># | 1| 1|</span>
<span class="c1"># | 2| 4|</span>
<span class="c1"># | 3| 9|</span>
<span class="c1"># +---+-------+</span>
</pre></div>
</div>
<p>For more detailed usage, please see <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.functions.udtf.html#pyspark.sql.functions.udtf" title="pyspark.sql.functions.udtf"><code class="xref py py-func docutils literal notranslate"><span class="pre">udtf()</span></code></a>.</p>
</div>
<div class="section" id="registering-and-using-python-udtfs-in-sql">
<h2>Registering and Using Python UDTFs in SQL<a class="headerlink" href="#registering-and-using-python-udtfs-in-sql" title="Permalink to this headline">¶</a></h2>
<p>Python UDTFs can also be registered and used in SQL queries.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">udtf</span>
<span class="nd">@udtf</span><span class="p">(</span><span class="n">returnType</span><span class="o">=</span><span class="s2">&quot;word: string&quot;</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">WordSplitter</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">eval</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">text</span><span class="p">:</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">for</span> <span class="n">word</span> <span class="ow">in</span> <span class="n">text</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s2">&quot; &quot;</span><span class="p">):</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">word</span><span class="o">.</span><span class="n">strip</span><span class="p">(),)</span>
<span class="c1"># Register the UDTF for use in Spark SQL.</span>
<span class="n">spark</span><span class="o">.</span><span class="n">udtf</span><span class="o">.</span><span class="n">register</span><span class="p">(</span><span class="s2">&quot;split_words&quot;</span><span class="p">,</span> <span class="n">WordSplitter</span><span class="p">)</span>
<span class="c1"># Example: Using the UDTF in SQL.</span>
<span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">&quot;SELECT * FROM split_words(&#39;hello world&#39;)&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +-----+</span>
<span class="c1"># | word|</span>
<span class="c1"># +-----+</span>
<span class="c1"># |hello|</span>
<span class="c1"># |world|</span>
<span class="c1"># +-----+</span>
<span class="c1"># Example: Using the UDTF with a lateral join in SQL.</span>
<span class="c1"># The lateral join allows us to reference the columns and aliases</span>
<span class="c1"># in the previous FROM clause items as inputs to the UDTF.</span>
<span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span>
<span class="s2">&quot;SELECT * FROM VALUES (&#39;Hello World&#39;), (&#39;Apache Spark&#39;) t(text), &quot;</span>
<span class="s2">&quot;LATERAL split_words(text)&quot;</span>
<span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +------------+------+</span>
<span class="c1"># | text| word|</span>
<span class="c1"># +------------+------+</span>
<span class="c1"># | Hello World| Hello|</span>
<span class="c1"># | Hello World| World|</span>
<span class="c1"># |Apache Spark|Apache|</span>
<span class="c1"># |Apache Spark| Spark|</span>
<span class="c1"># +------------+------+</span>
</pre></div>
</div>
</div>
<div class="section" id="arrow-optimization">
<h2>Arrow Optimization<a class="headerlink" href="#arrow-optimization" title="Permalink to this headline">¶</a></h2>
<p>Apache Arrow is an in-memory columnar data format used in Spark to efficiently transfer
data between Java and Python processes. Apache Arrow is disabled by default for Python UDTFs.</p>
<p>Arrow can improve performance when each input row generates a large result table from the UDTF.</p>
<p>To enable Arrow optimization, set the <code class="docutils literal notranslate"><span class="pre">spark.sql.execution.pythonUDTF.arrow.enabled</span></code>
configuration to <code class="docutils literal notranslate"><span class="pre">true</span></code>. You can also enable it by specifying the <code class="docutils literal notranslate"><span class="pre">useArrow</span></code> parameter
when declaring the UDTF.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">udtf</span>
<span class="nd">@udtf</span><span class="p">(</span><span class="n">returnType</span><span class="o">=</span><span class="s2">&quot;c1: int, c2: int&quot;</span><span class="p">,</span> <span class="n">useArrow</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">PlusOne</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">eval</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">x</span><span class="p">:</span> <span class="nb">int</span><span class="p">):</span>
<span class="k">yield</span> <span class="n">x</span><span class="p">,</span> <span class="n">x</span> <span class="o">+</span> <span class="mi">1</span>
</pre></div>
</div>
<p>For more details, please see <a class="reference internal" href="../arrow_pandas.html"><span class="doc">Apache Arrow in PySpark</span></a>.</p>
<div class="section" id="table-input-argument">
<h3>TABLE input argument<a class="headerlink" href="#table-input-argument" title="Permalink to this headline">¶</a></h3>
<p>Python UDTFs can also take a TABLE as input argument, and it can be used in conjunction
with scalar input arguments.
By default, you are allowed to have only one TABLE argument as input, primarily for
performance reasons. If you need to have more than one TABLE input argument,
you can enable this by setting the <code class="docutils literal notranslate"><span class="pre">spark.sql.tvf.allowMultipleTableArguments.enabled</span></code>
configuration to <code class="docutils literal notranslate"><span class="pre">true</span></code>.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">udtf</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="n">Row</span>
<span class="nd">@udtf</span><span class="p">(</span><span class="n">returnType</span><span class="o">=</span><span class="s2">&quot;id: int&quot;</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">FilterUDTF</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">eval</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">row</span><span class="p">:</span> <span class="n">Row</span><span class="p">):</span>
<span class="k">if</span> <span class="n">row</span><span class="p">[</span><span class="s2">&quot;id&quot;</span><span class="p">]</span> <span class="o">&gt;</span> <span class="mi">5</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">row</span><span class="p">[</span><span class="s2">&quot;id&quot;</span><span class="p">],</span>
<span class="n">spark</span><span class="o">.</span><span class="n">udtf</span><span class="o">.</span><span class="n">register</span><span class="p">(</span><span class="s2">&quot;filter_udtf&quot;</span><span class="p">,</span> <span class="n">FilterUDTF</span><span class="p">)</span>
<span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">&quot;SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+</span>
<span class="c1"># | id|</span>
<span class="c1"># +---+</span>
<span class="c1"># | 6|</span>
<span class="c1"># | 7|</span>
<span class="c1"># | 8|</span>
<span class="c1"># | 9|</span>
<span class="c1"># +---+</span>
</pre></div>
</div>
</div>
</div>
<div class="section" id="more-examples">
<h2>More Examples<a class="headerlink" href="#more-examples" title="Permalink to this headline">¶</a></h2>
<p>A Python UDTF that expands date ranges into individual dates:</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">timedelta</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">lit</span><span class="p">,</span> <span class="n">udtf</span>
<span class="nd">@udtf</span><span class="p">(</span><span class="n">returnType</span><span class="o">=</span><span class="s2">&quot;date: string&quot;</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">DateExpander</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">eval</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">start_date</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">end_date</span><span class="p">:</span> <span class="nb">str</span><span class="p">):</span>
<span class="n">current</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">strptime</span><span class="p">(</span><span class="n">start_date</span><span class="p">,</span> <span class="s1">&#39;%Y-%m-</span><span class="si">%d</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="n">end</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">strptime</span><span class="p">(</span><span class="n">end_date</span><span class="p">,</span> <span class="s1">&#39;%Y-%m-</span><span class="si">%d</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="k">while</span> <span class="n">current</span> <span class="o">&lt;=</span> <span class="n">end</span><span class="p">:</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">current</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s1">&#39;%Y-%m-</span><span class="si">%d</span><span class="s1">&#39;</span><span class="p">),)</span>
<span class="n">current</span> <span class="o">+=</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">days</span><span class="o">=</span><span class="mi">1</span><span class="p">)</span>
<span class="n">DateExpander</span><span class="p">(</span><span class="n">lit</span><span class="p">(</span><span class="s2">&quot;2023-02-25&quot;</span><span class="p">),</span> <span class="n">lit</span><span class="p">(</span><span class="s2">&quot;2023-03-01&quot;</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +----------+</span>
<span class="c1"># | date|</span>
<span class="c1"># +----------+</span>
<span class="c1"># |2023-02-25|</span>
<span class="c1"># |2023-02-26|</span>
<span class="c1"># |2023-02-27|</span>
<span class="c1"># |2023-02-28|</span>
<span class="c1"># |2023-03-01|</span>
<span class="c1"># +----------+</span>
</pre></div>
</div>
<p>A Python UDTF with <code class="docutils literal notranslate"><span class="pre">__init__</span></code> and <code class="docutils literal notranslate"><span class="pre">terminate</span></code>:</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">udtf</span>
<span class="nd">@udtf</span><span class="p">(</span><span class="n">returnType</span><span class="o">=</span><span class="s2">&quot;cnt: int&quot;</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">CountUDTF</span><span class="p">:</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># Initialize the counter to 0 when an instance of the class is created.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">count</span> <span class="o">=</span> <span class="mi">0</span>
<span class="k">def</span> <span class="nf">eval</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">x</span><span class="p">:</span> <span class="nb">int</span><span class="p">):</span>
<span class="c1"># Increment the counter by 1 for each input value received.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">count</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="k">def</span> <span class="nf">terminate</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># Yield the final count when the UDTF is done processing.</span>
<span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">count</span><span class="p">,</span>
<span class="n">spark</span><span class="o">.</span><span class="n">udtf</span><span class="o">.</span><span class="n">register</span><span class="p">(</span><span class="s2">&quot;count_udtf&quot;</span><span class="p">,</span> <span class="n">CountUDTF</span><span class="p">)</span>
<span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">&quot;SELECT * FROM range(0, 10, 1, 1), LATERAL count_udtf(id)&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+---+</span>
<span class="c1"># | id|cnt|</span>
<span class="c1"># +---+---+</span>
<span class="c1"># | 9| 10|</span>
<span class="c1"># +---+---+</span>
<span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">&quot;SELECT * FROM range(0, 10, 1, 2), LATERAL count_udtf(id)&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+---+</span>
<span class="c1"># | id|cnt|</span>
<span class="c1"># +---+---+</span>
<span class="c1"># | 4| 5|</span>
<span class="c1"># | 9| 5|</span>
<span class="c1"># +---+---+</span>
</pre></div>
</div>
</div>
</div>
</div>
<!-- Previous / next buttons -->
<div class='prev-next-area'>
<a class='left-prev' id="prev-link" href="arrow_pandas.html" title="previous page">
<i class="fas fa-angle-left"></i>
<div class="prev-next-info">
<p class="prev-next-subtitle">previous</p>
<p class="prev-next-title">Apache Arrow in PySpark</p>
</div>
</a>
<a class='right-next' id="next-link" href="../pandas_on_spark/index.html" title="next page">
<div class="prev-next-info">
<p class="prev-next-subtitle">next</p>
<p class="prev-next-title">Pandas API on Spark</p>
</div>
<i class="fas fa-angle-right"></i>
</a>
</div>
</main>
</div>
</div>
<script src="../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf"></script>
<footer class="footer mt-5 mt-md-0">
<div class="container">
<div class="footer-item">
<p class="copyright">
&copy; Copyright .<br>
</p>
</div>
<div class="footer-item">
<p class="sphinx-version">
Created using <a href="http://sphinx-doc.org/">Sphinx</a> 3.0.4.<br>
</p>
</div>
</div>
</footer>
</body>
</html>