blob: 378efefc515d83d8d1f7fd3282df1a98211f6909 [file] [log] [blame]
<!DOCTYPE html>
<html >
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>pyspark.sql.udf &#8212; PySpark 4.0.0-preview1 documentation</title>
<script data-cfasync="false">
document.documentElement.dataset.mode = localStorage.getItem("mode") || "";
document.documentElement.dataset.theme = localStorage.getItem("theme") || "light";
</script>
<!-- Loaded before other Sphinx assets -->
<link href="../../../_static/styles/theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../../_static/styles/bootstrap.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../../_static/styles/pydata-sphinx-theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../../_static/vendor/fontawesome/6.1.2/css/all.min.css?digest=e353d410970836974a52" rel="stylesheet" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../../_static/vendor/fontawesome/6.1.2/webfonts/fa-solid-900.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../../_static/vendor/fontawesome/6.1.2/webfonts/fa-brands-400.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../../_static/vendor/fontawesome/6.1.2/webfonts/fa-regular-400.woff2" />
<link rel="stylesheet" type="text/css" href="../../../_static/pygments.css" />
<link rel="stylesheet" type="text/css" href="../../../_static/copybutton.css" />
<link rel="stylesheet" type="text/css" href="../../../_static/css/pyspark.css" />
<!-- Pre-loaded scripts that we'll load fully later -->
<link rel="preload" as="script" href="../../../_static/scripts/bootstrap.js?digest=e353d410970836974a52" />
<link rel="preload" as="script" href="../../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52" />
<script data-url_root="../../../" id="documentation_options" src="../../../_static/documentation_options.js"></script>
<script src="../../../_static/jquery.js"></script>
<script src="../../../_static/underscore.js"></script>
<script src="../../../_static/doctools.js"></script>
<script src="../../../_static/clipboard.min.js"></script>
<script src="../../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script>DOCUMENTATION_OPTIONS.pagename = '_modules/pyspark/sql/udf';</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/udf.html" />
<link rel="search" title="Search" href="../../../search.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="None">
<!-- Matomo -->
<script type="text/javascript">
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body data-bs-spy="scroll" data-bs-target=".bd-toc-nav" data-offset="180" data-bs-root-margin="0px 0px -60%" data-default-mode="">
<a class="skip-link" href="#main-content">Skip to main content</a>
<input type="checkbox"
class="sidebar-toggle"
name="__primary"
id="__primary"/>
<label class="overlay overlay-primary" for="__primary"></label>
<input type="checkbox"
class="sidebar-toggle"
name="__secondary"
id="__secondary"/>
<label class="overlay overlay-secondary" for="__secondary"></label>
<div class="search-button__wrapper">
<div class="search-button__overlay"></div>
<div class="search-button__search-container">
<form class="bd-search d-flex align-items-center"
action="../../../search.html"
method="get">
<i class="fa-solid fa-magnifying-glass"></i>
<input type="search"
class="form-control"
name="q"
id="search-input"
placeholder="Search the docs ..."
aria-label="Search the docs ..."
autocomplete="off"
autocorrect="off"
autocapitalize="off"
spellcheck="false"/>
<span class="search-button__kbd-shortcut"><kbd class="kbd-shortcut__modifier">Ctrl</kbd>+<kbd>K</kbd></span>
</form></div>
</div>
<nav class="bd-header navbar navbar-expand-lg bd-navbar">
<div class="bd-header__inner bd-page-width">
<label class="sidebar-toggle primary-toggle" for="__primary">
<span class="fa-solid fa-bars"></span>
</label>
<div class="navbar-header-items__start">
<div class="navbar-item">
<a class="navbar-brand logo" href="../../../index.html">
<img src="../../../_static/spark-logo-light.png" class="logo__image only-light" alt="Logo image"/>
<script>document.write(`<img src="../../../_static/spark-logo-dark.png" class="logo__image only-dark" alt="Logo image"/>`);</script>
</a></div>
</div>
<div class="col-lg-9 navbar-header-items">
<div class="me-auto navbar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../user_guide/index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="navbar-header-items__end">
<div class="navbar-item navbar-persistent--container">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview1
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/sql/udf.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="navbar-persistent--mobile">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
</div>
</nav>
<div class="bd-container">
<div class="bd-container__inner bd-page-width">
<div class="bd-sidebar-primary bd-sidebar hide-on-wide">
<div class="sidebar-header-items sidebar-primary__section">
<div class="sidebar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../user_guide/index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="sidebar-header-items__end">
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview1
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/sql/udf.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="sidebar-primary-items__end sidebar-primary__section">
</div>
<div id="rtd-footer-container"></div>
</div>
<main id="main-content" class="bd-main">
<div class="bd-content">
<div class="bd-article-container">
<div class="bd-header-article">
<div class="header-article-items header-article__inner">
<div class="header-article-items__start">
<div class="header-article-item">
<nav aria-label="Breadcrumbs">
<ul class="bd-breadcrumbs" role="navigation" aria-label="Breadcrumb">
<li class="breadcrumb-item breadcrumb-home">
<a href="../../../index.html" class="nav-link" aria-label="Home">
<i class="fa-solid fa-home"></i>
</a>
</li>
<li class="breadcrumb-item"><a href="../../index.html" class="nav-link">Module code</a></li>
<li class="breadcrumb-item active" aria-current="page">pyspark.sql.udf</li>
</ul>
</nav>
</div>
</div>
</div>
</div>
<div id="searchbox"></div>
<article class="bd-article" role="main">
<h1>Source code for pyspark.sql.udf</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd">User-defined function related classes and functions</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">inspect</span> <span class="kn">import</span> <span class="n">getfullargspec</span>
<span class="kn">import</span> <span class="nn">functools</span>
<span class="kn">import</span> <span class="nn">inspect</span>
<span class="kn">import</span> <span class="nn">sys</span>
<span class="kn">import</span> <span class="nn">warnings</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Callable</span><span class="p">,</span> <span class="n">Any</span><span class="p">,</span> <span class="n">TYPE_CHECKING</span><span class="p">,</span> <span class="n">Optional</span><span class="p">,</span> <span class="n">cast</span><span class="p">,</span> <span class="n">Union</span>
<span class="kn">from</span> <span class="nn">pyspark.util</span> <span class="kn">import</span> <span class="n">PythonEvalType</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.column</span> <span class="kn">import</span> <span class="n">Column</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="p">(</span>
<span class="n">DataType</span><span class="p">,</span>
<span class="n">StringType</span><span class="p">,</span>
<span class="n">StructType</span><span class="p">,</span>
<span class="n">_parse_datatype_string</span><span class="p">,</span>
<span class="p">)</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.utils</span> <span class="kn">import</span> <span class="n">get_active_spark_context</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.pandas.types</span> <span class="kn">import</span> <span class="n">to_arrow_type</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.pandas.utils</span> <span class="kn">import</span> <span class="n">require_minimum_pandas_version</span><span class="p">,</span> <span class="n">require_minimum_pyarrow_version</span>
<span class="kn">from</span> <span class="nn">pyspark.errors</span> <span class="kn">import</span> <span class="n">PySparkTypeError</span><span class="p">,</span> <span class="n">PySparkNotImplementedError</span><span class="p">,</span> <span class="n">PySparkRuntimeError</span>
<span class="k">if</span> <span class="n">TYPE_CHECKING</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">py4j.java_gateway</span> <span class="kn">import</span> <span class="n">JavaObject</span>
<span class="kn">from</span> <span class="nn">pyspark.core.context</span> <span class="kn">import</span> <span class="n">SparkContext</span>
<span class="kn">from</span> <span class="nn">pyspark.sql._typing</span> <span class="kn">import</span> <span class="n">DataTypeOrString</span><span class="p">,</span> <span class="n">ColumnOrName</span><span class="p">,</span> <span class="n">UserDefinedFunctionLike</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.session</span> <span class="kn">import</span> <span class="n">SparkSession</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s2">&quot;UDFRegistration&quot;</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">_wrap_function</span><span class="p">(</span>
<span class="n">sc</span><span class="p">:</span> <span class="s2">&quot;SparkContext&quot;</span><span class="p">,</span> <span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[</span><span class="o">...</span><span class="p">,</span> <span class="n">Any</span><span class="p">],</span> <span class="n">returnType</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">DataType</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;JavaObject&quot;</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark.core.rdd</span> <span class="kn">import</span> <span class="n">_prepare_for_python_RDD</span>
<span class="n">command</span><span class="p">:</span> <span class="n">Any</span>
<span class="k">if</span> <span class="n">returnType</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">command</span> <span class="o">=</span> <span class="n">func</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">command</span> <span class="o">=</span> <span class="p">(</span><span class="n">func</span><span class="p">,</span> <span class="n">returnType</span><span class="p">)</span>
<span class="n">pickled_command</span><span class="p">,</span> <span class="n">broadcast_vars</span><span class="p">,</span> <span class="n">env</span><span class="p">,</span> <span class="n">includes</span> <span class="o">=</span> <span class="n">_prepare_for_python_RDD</span><span class="p">(</span><span class="n">sc</span><span class="p">,</span> <span class="n">command</span><span class="p">)</span>
<span class="k">assert</span> <span class="n">sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="k">return</span> <span class="n">sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">SimplePythonFunction</span><span class="p">(</span>
<span class="nb">bytearray</span><span class="p">(</span><span class="n">pickled_command</span><span class="p">),</span>
<span class="n">env</span><span class="p">,</span>
<span class="n">includes</span><span class="p">,</span>
<span class="n">sc</span><span class="o">.</span><span class="n">pythonExec</span><span class="p">,</span>
<span class="n">sc</span><span class="o">.</span><span class="n">pythonVer</span><span class="p">,</span>
<span class="n">broadcast_vars</span><span class="p">,</span>
<span class="n">sc</span><span class="o">.</span><span class="n">_javaAccumulator</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">def</span> <span class="nf">_create_udf</span><span class="p">(</span>
<span class="n">f</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[</span><span class="o">...</span><span class="p">,</span> <span class="n">Any</span><span class="p">],</span>
<span class="n">returnType</span><span class="p">:</span> <span class="s2">&quot;DataTypeOrString&quot;</span><span class="p">,</span>
<span class="n">evalType</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">name</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">deterministic</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">True</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;UserDefinedFunctionLike&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Create a regular(non-Arrow-optimized) Python UDF.&quot;&quot;&quot;</span>
<span class="c1"># Set the name of the UserDefinedFunction object to be the name of function f</span>
<span class="n">udf_obj</span> <span class="o">=</span> <span class="n">UserDefinedFunction</span><span class="p">(</span>
<span class="n">f</span><span class="p">,</span> <span class="n">returnType</span><span class="o">=</span><span class="n">returnType</span><span class="p">,</span> <span class="n">name</span><span class="o">=</span><span class="n">name</span><span class="p">,</span> <span class="n">evalType</span><span class="o">=</span><span class="n">evalType</span><span class="p">,</span> <span class="n">deterministic</span><span class="o">=</span><span class="n">deterministic</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">udf_obj</span><span class="o">.</span><span class="n">_wrapped</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">_create_py_udf</span><span class="p">(</span>
<span class="n">f</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[</span><span class="o">...</span><span class="p">,</span> <span class="n">Any</span><span class="p">],</span>
<span class="n">returnType</span><span class="p">:</span> <span class="s2">&quot;DataTypeOrString&quot;</span><span class="p">,</span>
<span class="n">useArrow</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;UserDefinedFunctionLike&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Create a regular/Arrow-optimized Python UDF.&quot;&quot;&quot;</span>
<span class="c1"># The following table shows the results when the type coercion in Arrow is needed, that is,</span>
<span class="c1"># when the user-specified return type(SQL Type) of the UDF and the actual instance(Python</span>
<span class="c1"># Value(Type)) that the UDF returns are different.</span>
<span class="c1"># Arrow and Pickle have different type coercion rules, so a UDF might have a different result</span>
<span class="c1"># with/without Arrow optimization. That&#39;s the main reason the Arrow optimization for Python</span>
<span class="c1"># UDFs is disabled by default.</span>
<span class="c1"># +-----------------------------+--------------+----------+------+---------------+--------------------+-----------------------------+----------+----------------------+---------+--------------------+----------------------------+------------+--------------+ # noqa</span>
<span class="c1"># |SQL Type \ Python Value(Type)|None(NoneType)|True(bool)|1(int)| a(str)| 1970-01-01(date)|1970-01-01 00:00:00(datetime)|1.0(float)|array(&#39;i&#39;, [1])(array)|[1](list)| (1,)(tuple)|bytearray(b&#39;ABC&#39;)(bytearray)| 1(Decimal)|{&#39;a&#39;: 1}(dict)| # noqa</span>
<span class="c1"># +-----------------------------+--------------+----------+------+---------------+--------------------+-----------------------------+----------+----------------------+---------+--------------------+----------------------------+------------+--------------+ # noqa</span>
<span class="c1"># | boolean| None| True| None| None| None| None| None| None| None| None| None| None| None| # noqa</span>
<span class="c1"># | tinyint| None| None| 1| None| None| None| None| None| None| None| None| None| None| # noqa</span>
<span class="c1"># | smallint| None| None| 1| None| None| None| None| None| None| None| None| None| None| # noqa</span>
<span class="c1"># | int| None| None| 1| None| None| None| None| None| None| None| None| None| None| # noqa</span>
<span class="c1"># | bigint| None| None| 1| None| None| None| None| None| None| None| None| None| None| # noqa</span>
<span class="c1"># | string| None| &#39;true&#39;| &#39;1&#39;| &#39;a&#39;|&#39;java.util.Gregor...| &#39;java.util.Gregor...| &#39;1.0&#39;| &#39;[I@120d813a&#39;| &#39;[1]&#39;|&#39;[Ljava.lang.Obje...| &#39;[B@48571878&#39;| &#39;1&#39;| &#39;{a=1}&#39;| # noqa</span>
<span class="c1"># | date| None| X| X| X|datetime.date(197...| datetime.date(197...| X| X| X| X| X| X| X| # noqa</span>
<span class="c1"># | timestamp| None| X| X| X| X| datetime.datetime...| X| X| X| X| X| X| X| # noqa</span>
<span class="c1"># | float| None| None| None| None| None| None| 1.0| None| None| None| None| None| None| # noqa</span>
<span class="c1"># | double| None| None| None| None| None| None| 1.0| None| None| None| None| None| None| # noqa</span>
<span class="c1"># | binary| None| None| None|bytearray(b&#39;a&#39;)| None| None| None| None| None| None| bytearray(b&#39;ABC&#39;)| None| None| # noqa</span>
<span class="c1"># | decimal(10,0)| None| None| None| None| None| None| None| None| None| None| None|Decimal(&#39;1&#39;)| None| # noqa</span>
<span class="c1"># +-----------------------------+--------------+----------+------+---------------+--------------------+-----------------------------+----------+----------------------+---------+--------------------+----------------------------+------------+--------------+ # noqa</span>
<span class="c1"># Note: Python 3.9.15, Pandas 1.5.2 and PyArrow 10.0.1 are used.</span>
<span class="c1"># Note: The values of &#39;SQL Type&#39; are DDL formatted strings, which can be used as `returnType`s.</span>
<span class="c1"># Note: The values inside the table are generated by `repr`. X&#39; means it throws an exception</span>
<span class="c1"># during the conversion.</span>
<span class="k">if</span> <span class="n">useArrow</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span>
<span class="n">session</span> <span class="o">=</span> <span class="n">SparkSession</span><span class="o">.</span><span class="n">_instantiatedSession</span>
<span class="n">is_arrow_enabled</span> <span class="o">=</span> <span class="p">(</span>
<span class="kc">False</span>
<span class="k">if</span> <span class="n">session</span> <span class="ow">is</span> <span class="kc">None</span>
<span class="k">else</span> <span class="n">session</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">&quot;spark.sql.execution.pythonUDF.arrow.enabled&quot;</span><span class="p">)</span> <span class="o">==</span> <span class="s2">&quot;true&quot;</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">is_arrow_enabled</span> <span class="o">=</span> <span class="n">useArrow</span>
<span class="n">eval_type</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_BATCHED_UDF</span>
<span class="k">if</span> <span class="n">is_arrow_enabled</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">is_func_with_args</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">getfullargspec</span><span class="p">(</span><span class="n">f</span><span class="p">)</span><span class="o">.</span><span class="n">args</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">0</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="n">is_func_with_args</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">if</span> <span class="n">is_func_with_args</span><span class="p">:</span>
<span class="n">require_minimum_pandas_version</span><span class="p">()</span>
<span class="n">require_minimum_pyarrow_version</span><span class="p">()</span>
<span class="n">eval_type</span> <span class="o">=</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_ARROW_BATCHED_UDF</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">warnings</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span>
<span class="s2">&quot;Arrow optimization for Python UDFs cannot be enabled for functions&quot;</span>
<span class="s2">&quot; without arguments.&quot;</span><span class="p">,</span>
<span class="ne">UserWarning</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">_create_udf</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="n">returnType</span><span class="p">,</span> <span class="n">eval_type</span><span class="p">)</span>
<div class="viewcode-block" id="UserDefinedFunction"><a class="viewcode-back" href="../../../reference/pyspark.sql/api/pyspark.sql.udf.UserDefinedFunction.html#pyspark.sql.UserDefinedFunction">[docs]</a><span class="k">class</span> <span class="nc">UserDefinedFunction</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> User defined function in Python</span>
<span class="sd"> .. versionadded:: 1.3</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> The constructor of this class is not supposed to be directly called.</span>
<span class="sd"> Use :meth:`pyspark.sql.functions.udf` or :meth:`pyspark.sql.functions.pandas_udf`</span>
<span class="sd"> to create this instance.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[</span><span class="o">...</span><span class="p">,</span> <span class="n">Any</span><span class="p">],</span>
<span class="n">returnType</span><span class="p">:</span> <span class="s2">&quot;DataTypeOrString&quot;</span> <span class="o">=</span> <span class="n">StringType</span><span class="p">(),</span>
<span class="n">name</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">evalType</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_BATCHED_UDF</span><span class="p">,</span>
<span class="n">deterministic</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">True</span><span class="p">,</span>
<span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">callable</span><span class="p">(</span><span class="n">func</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">PySparkTypeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;NOT_CALLABLE&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;arg_name&quot;</span><span class="p">:</span> <span class="s2">&quot;func&quot;</span><span class="p">,</span> <span class="s2">&quot;arg_type&quot;</span><span class="p">:</span> <span class="nb">type</span><span class="p">(</span><span class="n">func</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">},</span>
<span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">returnType</span><span class="p">,</span> <span class="p">(</span><span class="n">DataType</span><span class="p">,</span> <span class="nb">str</span><span class="p">)):</span>
<span class="k">raise</span> <span class="n">PySparkTypeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;NOT_DATATYPE_OR_STR&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;arg_name&quot;</span><span class="p">:</span> <span class="s2">&quot;returnType&quot;</span><span class="p">,</span>
<span class="s2">&quot;arg_type&quot;</span><span class="p">:</span> <span class="nb">type</span><span class="p">(</span><span class="n">returnType</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">evalType</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">PySparkTypeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;NOT_INT&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;arg_name&quot;</span><span class="p">:</span> <span class="s2">&quot;evalType&quot;</span><span class="p">,</span> <span class="s2">&quot;arg_type&quot;</span><span class="p">:</span> <span class="nb">type</span><span class="p">(</span><span class="n">evalType</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">},</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">func</span> <span class="o">=</span> <span class="n">func</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_returnType</span> <span class="o">=</span> <span class="n">returnType</span>
<span class="c1"># Stores UserDefinedPythonFunctions jobj, once initialized</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_returnType_placeholder</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">DataType</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_judf_placeholder</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_name</span> <span class="o">=</span> <span class="n">name</span> <span class="ow">or</span> <span class="p">(</span>
<span class="n">func</span><span class="o">.</span><span class="vm">__name__</span> <span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">func</span><span class="p">,</span> <span class="s2">&quot;__name__&quot;</span><span class="p">)</span> <span class="k">else</span> <span class="n">func</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">evalType</span> <span class="o">=</span> <span class="n">evalType</span>
<span class="bp">self</span><span class="o">.</span><span class="n">deterministic</span> <span class="o">=</span> <span class="n">deterministic</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">_check_return_type</span><span class="p">(</span><span class="n">returnType</span><span class="p">:</span> <span class="n">DataType</span><span class="p">,</span> <span class="n">evalType</span><span class="p">:</span> <span class="nb">int</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="n">evalType</span> <span class="o">==</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_ARROW_BATCHED_UDF</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">to_arrow_type</span><span class="p">(</span><span class="n">returnType</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkNotImplementedError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;NOT_IMPLEMENTED&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;feature&quot;</span><span class="p">:</span> <span class="sa">f</span><span class="s2">&quot;Invalid return type with Arrow-optimized Python UDF: &quot;</span>
<span class="sa">f</span><span class="s2">&quot;</span><span class="si">{</span><span class="n">returnType</span><span class="si">}</span><span class="s2">&quot;</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="p">(</span>
<span class="n">evalType</span> <span class="o">==</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_SCALAR_PANDAS_UDF</span>
<span class="ow">or</span> <span class="n">evalType</span> <span class="o">==</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_SCALAR_PANDAS_ITER_UDF</span>
<span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">to_arrow_type</span><span class="p">(</span><span class="n">returnType</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkNotImplementedError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;NOT_IMPLEMENTED&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;feature&quot;</span><span class="p">:</span> <span class="sa">f</span><span class="s2">&quot;Invalid return type with scalar Pandas UDFs: &quot;</span> <span class="sa">f</span><span class="s2">&quot;</span><span class="si">{</span><span class="n">returnType</span><span class="si">}</span><span class="s2">&quot;</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="p">(</span>
<span class="n">evalType</span> <span class="o">==</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_GROUPED_MAP_PANDAS_UDF</span>
<span class="ow">or</span> <span class="n">evalType</span> <span class="o">==</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE</span>
<span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">returnType</span><span class="p">,</span> <span class="n">StructType</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">to_arrow_type</span><span class="p">(</span><span class="n">returnType</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkNotImplementedError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;NOT_IMPLEMENTED&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;feature&quot;</span><span class="p">:</span> <span class="sa">f</span><span class="s2">&quot;Invalid return type with grouped map Pandas UDFs or &quot;</span>
<span class="sa">f</span><span class="s2">&quot;at groupby.applyInPandas(WithState): </span><span class="si">{</span><span class="n">returnType</span><span class="si">}</span><span class="s2">&quot;</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkTypeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_RETURN_TYPE_FOR_PANDAS_UDF&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;eval_type&quot;</span><span class="p">:</span> <span class="s2">&quot;SQL_GROUPED_MAP_PANDAS_UDF or &quot;</span>
<span class="s2">&quot;SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE&quot;</span><span class="p">,</span>
<span class="s2">&quot;return_type&quot;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">returnType</span><span class="p">),</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="p">(</span>
<span class="n">evalType</span> <span class="o">==</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_MAP_PANDAS_ITER_UDF</span>
<span class="ow">or</span> <span class="n">evalType</span> <span class="o">==</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_MAP_ARROW_ITER_UDF</span>
<span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">returnType</span><span class="p">,</span> <span class="n">StructType</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">to_arrow_type</span><span class="p">(</span><span class="n">returnType</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkNotImplementedError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;NOT_IMPLEMENTED&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;feature&quot;</span><span class="p">:</span> <span class="sa">f</span><span class="s2">&quot;Invalid return type in mapInPandas: &quot;</span> <span class="sa">f</span><span class="s2">&quot;</span><span class="si">{</span><span class="n">returnType</span><span class="si">}</span><span class="s2">&quot;</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkTypeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_RETURN_TYPE_FOR_PANDAS_UDF&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;eval_type&quot;</span><span class="p">:</span> <span class="s2">&quot;SQL_MAP_PANDAS_ITER_UDF or SQL_MAP_ARROW_ITER_UDF&quot;</span><span class="p">,</span>
<span class="s2">&quot;return_type&quot;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">returnType</span><span class="p">),</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="n">evalType</span> <span class="o">==</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_GROUPED_MAP_ARROW_UDF</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">returnType</span><span class="p">,</span> <span class="n">StructType</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">to_arrow_type</span><span class="p">(</span><span class="n">returnType</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkNotImplementedError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;NOT_IMPLEMENTED&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;feature&quot;</span><span class="p">:</span> <span class="s2">&quot;Invalid return type with grouped map Arrow UDFs or &quot;</span>
<span class="sa">f</span><span class="s2">&quot;at groupby.applyInArrow: </span><span class="si">{</span><span class="n">returnType</span><span class="si">}</span><span class="s2">&quot;</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkTypeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_RETURN_TYPE_FOR_ARROW_UDF&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;eval_type&quot;</span><span class="p">:</span> <span class="s2">&quot;SQL_GROUPED_MAP_ARROW_UDF&quot;</span><span class="p">,</span>
<span class="s2">&quot;return_type&quot;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">returnType</span><span class="p">),</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="n">evalType</span> <span class="o">==</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_COGROUPED_MAP_PANDAS_UDF</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">returnType</span><span class="p">,</span> <span class="n">StructType</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">to_arrow_type</span><span class="p">(</span><span class="n">returnType</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkNotImplementedError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;NOT_IMPLEMENTED&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;feature&quot;</span><span class="p">:</span> <span class="sa">f</span><span class="s2">&quot;Invalid return type in cogroup.applyInPandas: &quot;</span>
<span class="sa">f</span><span class="s2">&quot;</span><span class="si">{</span><span class="n">returnType</span><span class="si">}</span><span class="s2">&quot;</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkTypeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_RETURN_TYPE_FOR_PANDAS_UDF&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;eval_type&quot;</span><span class="p">:</span> <span class="s2">&quot;SQL_COGROUPED_MAP_PANDAS_UDF&quot;</span><span class="p">,</span>
<span class="s2">&quot;return_type&quot;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">returnType</span><span class="p">),</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="n">evalType</span> <span class="o">==</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_COGROUPED_MAP_ARROW_UDF</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">returnType</span><span class="p">,</span> <span class="n">StructType</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">to_arrow_type</span><span class="p">(</span><span class="n">returnType</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkNotImplementedError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;NOT_IMPLEMENTED&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;feature&quot;</span><span class="p">:</span> <span class="s2">&quot;Invalid return type in cogroup.applyInArrow: &quot;</span>
<span class="sa">f</span><span class="s2">&quot;</span><span class="si">{</span><span class="n">returnType</span><span class="si">}</span><span class="s2">&quot;</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkTypeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_RETURN_TYPE_FOR_ARROW_UDF&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;eval_type&quot;</span><span class="p">:</span> <span class="s2">&quot;SQL_COGROUPED_MAP_ARROW_UDF&quot;</span><span class="p">,</span>
<span class="s2">&quot;return_type&quot;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">returnType</span><span class="p">),</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="n">evalType</span> <span class="o">==</span> <span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_GROUPED_AGG_PANDAS_UDF</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># StructType is not yet allowed as a return type, explicitly check here to fail fast</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">returnType</span><span class="p">,</span> <span class="n">StructType</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">PySparkNotImplementedError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;NOT_IMPLEMENTED&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;feature&quot;</span><span class="p">:</span> <span class="sa">f</span><span class="s2">&quot;Invalid return type with grouped aggregate Pandas UDFs: &quot;</span>
<span class="sa">f</span><span class="s2">&quot;</span><span class="si">{</span><span class="n">returnType</span><span class="si">}</span><span class="s2">&quot;</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="n">to_arrow_type</span><span class="p">(</span><span class="n">returnType</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkNotImplementedError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;NOT_IMPLEMENTED&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;feature&quot;</span><span class="p">:</span> <span class="sa">f</span><span class="s2">&quot;Invalid return type with grouped aggregate Pandas UDFs: &quot;</span>
<span class="sa">f</span><span class="s2">&quot;</span><span class="si">{</span><span class="n">returnType</span><span class="si">}</span><span class="s2">&quot;</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">returnType</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">DataType</span><span class="p">:</span>
<span class="c1"># Make sure this is called after SparkContext is initialized.</span>
<span class="c1"># ``_parse_datatype_string`` accesses to JVM for parsing a DDL formatted string.</span>
<span class="c1"># TODO: PythonEvalType.SQL_BATCHED_UDF</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_returnType_placeholder</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_returnType</span><span class="p">,</span> <span class="n">DataType</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_returnType_placeholder</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_returnType</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_returnType_placeholder</span> <span class="o">=</span> <span class="n">_parse_datatype_string</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_returnType</span><span class="p">)</span>
<span class="n">UserDefinedFunction</span><span class="o">.</span><span class="n">_check_return_type</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_returnType_placeholder</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">evalType</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_returnType_placeholder</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">_judf</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;JavaObject&quot;</span><span class="p">:</span>
<span class="c1"># It is possible that concurrent access, to newly created UDF,</span>
<span class="c1"># will initialize multiple UserDefinedPythonFunctions.</span>
<span class="c1"># This is unlikely, doesn&#39;t affect correctness,</span>
<span class="c1"># and should have a minimal performance impact.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_judf_placeholder</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_judf_placeholder</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_create_judf</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">func</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_judf_placeholder</span>
<span class="k">def</span> <span class="nf">_create_judf</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[</span><span class="o">...</span><span class="p">,</span> <span class="n">Any</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;JavaObject&quot;</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span>
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span><span class="o">.</span><span class="n">_getActiveSessionOrCreate</span><span class="p">()</span>
<span class="n">sc</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span>
<span class="n">wrapped_func</span> <span class="o">=</span> <span class="n">_wrap_function</span><span class="p">(</span><span class="n">sc</span><span class="p">,</span> <span class="n">func</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">returnType</span><span class="p">)</span>
<span class="n">jdt</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">_jsparkSession</span><span class="o">.</span><span class="n">parseDataType</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">returnType</span><span class="o">.</span><span class="n">json</span><span class="p">())</span>
<span class="k">assert</span> <span class="n">sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">judf</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">execution</span><span class="o">.</span><span class="n">python</span><span class="o">.</span><span class="n">UserDefinedPythonFunction</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_name</span><span class="p">,</span> <span class="n">wrapped_func</span><span class="p">,</span> <span class="n">jdt</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">evalType</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">deterministic</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">judf</span>
<span class="k">def</span> <span class="fm">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">:</span> <span class="s2">&quot;ColumnOrName&quot;</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="s2">&quot;ColumnOrName&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Column</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.classic.column</span> <span class="kn">import</span> <span class="n">_to_java_expr</span><span class="p">,</span> <span class="n">_to_seq</span>
<span class="n">sc</span> <span class="o">=</span> <span class="n">get_active_spark_context</span><span class="p">()</span>
<span class="k">assert</span> <span class="n">sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">jexprs</span> <span class="o">=</span> <span class="p">[</span><span class="n">_to_java_expr</span><span class="p">(</span><span class="n">arg</span><span class="p">)</span> <span class="k">for</span> <span class="n">arg</span> <span class="ow">in</span> <span class="n">args</span><span class="p">]</span> <span class="o">+</span> <span class="p">[</span>
<span class="n">sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">catalyst</span><span class="o">.</span><span class="n">expressions</span><span class="o">.</span><span class="n">NamedArgumentExpression</span><span class="p">(</span>
<span class="n">key</span><span class="p">,</span> <span class="n">_to_java_expr</span><span class="p">(</span><span class="n">value</span><span class="p">)</span>
<span class="p">)</span>
<span class="k">for</span> <span class="n">key</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">items</span><span class="p">()</span>
<span class="p">]</span>
<span class="n">profiler_enabled</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">&quot;spark.python.profile&quot;</span><span class="p">,</span> <span class="s2">&quot;false&quot;</span><span class="p">)</span> <span class="o">==</span> <span class="s2">&quot;true&quot;</span>
<span class="n">memory_profiler_enabled</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">&quot;spark.python.profile.memory&quot;</span><span class="p">,</span> <span class="s2">&quot;false&quot;</span><span class="p">)</span> <span class="o">==</span> <span class="s2">&quot;true&quot;</span>
<span class="k">if</span> <span class="n">profiler_enabled</span> <span class="ow">or</span> <span class="n">memory_profiler_enabled</span><span class="p">:</span>
<span class="c1"># Disable profiling Pandas UDFs with iterators as input/output.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">evalType</span> <span class="ow">in</span> <span class="p">[</span>
<span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_SCALAR_PANDAS_ITER_UDF</span><span class="p">,</span>
<span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_MAP_PANDAS_ITER_UDF</span><span class="p">,</span>
<span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_MAP_ARROW_ITER_UDF</span><span class="p">,</span>
<span class="p">]:</span>
<span class="n">warnings</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span>
<span class="s2">&quot;Profiling UDFs with iterators input/output is not supported.&quot;</span><span class="p">,</span>
<span class="ne">UserWarning</span><span class="p">,</span>
<span class="p">)</span>
<span class="n">judf</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_judf</span>
<span class="n">jUDFExpr</span> <span class="o">=</span> <span class="n">judf</span><span class="o">.</span><span class="n">builder</span><span class="p">(</span><span class="n">_to_seq</span><span class="p">(</span><span class="n">sc</span><span class="p">,</span> <span class="n">jexprs</span><span class="p">))</span>
<span class="n">jPythonUDF</span> <span class="o">=</span> <span class="n">judf</span><span class="o">.</span><span class="n">fromUDFExpr</span><span class="p">(</span><span class="n">jUDFExpr</span><span class="p">)</span>
<span class="k">return</span> <span class="n">Column</span><span class="p">(</span><span class="n">jPythonUDF</span><span class="p">)</span>
<span class="c1"># Disallow enabling two profilers at the same time.</span>
<span class="k">if</span> <span class="n">profiler_enabled</span> <span class="ow">and</span> <span class="n">memory_profiler_enabled</span><span class="p">:</span>
<span class="c1"># When both profilers are enabled, they interfere with each other,</span>
<span class="c1"># that makes the result profile misleading.</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;CANNOT_SET_TOGETHER&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;arg_list&quot;</span><span class="p">:</span> <span class="s2">&quot;&#39;spark.python.profile&#39; and &quot;</span>
<span class="s2">&quot;&#39;spark.python.profile.memory&#39; configuration&quot;</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="n">profiler_enabled</span><span class="p">:</span>
<span class="n">f</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">func</span>
<span class="n">profiler</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">profiler_collector</span><span class="o">.</span><span class="n">new_udf_profiler</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span>
<span class="nd">@functools</span><span class="o">.</span><span class="n">wraps</span><span class="p">(</span><span class="n">f</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">func</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="n">Any</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Any</span><span class="p">:</span>
<span class="k">assert</span> <span class="n">profiler</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="k">return</span> <span class="n">profiler</span><span class="o">.</span><span class="n">profile</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="n">func</span><span class="o">.</span><span class="n">__signature__</span> <span class="o">=</span> <span class="n">inspect</span><span class="o">.</span><span class="n">signature</span><span class="p">(</span><span class="n">f</span><span class="p">)</span> <span class="c1"># type: ignore[attr-defined]</span>
<span class="n">judf</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_create_judf</span><span class="p">(</span><span class="n">func</span><span class="p">)</span>
<span class="n">jUDFExpr</span> <span class="o">=</span> <span class="n">judf</span><span class="o">.</span><span class="n">builder</span><span class="p">(</span><span class="n">_to_seq</span><span class="p">(</span><span class="n">sc</span><span class="p">,</span> <span class="n">jexprs</span><span class="p">))</span>
<span class="n">jPythonUDF</span> <span class="o">=</span> <span class="n">judf</span><span class="o">.</span><span class="n">fromUDFExpr</span><span class="p">(</span><span class="n">jUDFExpr</span><span class="p">)</span>
<span class="nb">id</span> <span class="o">=</span> <span class="n">jUDFExpr</span><span class="o">.</span><span class="n">resultId</span><span class="p">()</span><span class="o">.</span><span class="n">id</span><span class="p">()</span>
<span class="n">sc</span><span class="o">.</span><span class="n">profiler_collector</span><span class="o">.</span><span class="n">add_profiler</span><span class="p">(</span><span class="nb">id</span><span class="p">,</span> <span class="n">profiler</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span> <span class="c1"># memory_profiler_enabled</span>
<span class="n">f</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">func</span>
<span class="n">memory_profiler</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">profiler_collector</span><span class="o">.</span><span class="n">new_memory_profiler</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span>
<span class="p">(</span><span class="n">sub_lines</span><span class="p">,</span> <span class="n">start_line</span><span class="p">)</span> <span class="o">=</span> <span class="n">inspect</span><span class="o">.</span><span class="n">getsourcelines</span><span class="p">(</span><span class="n">f</span><span class="o">.</span><span class="vm">__code__</span><span class="p">)</span>
<span class="nd">@functools</span><span class="o">.</span><span class="n">wraps</span><span class="p">(</span><span class="n">f</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">func</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="n">Any</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Any</span><span class="p">:</span>
<span class="k">assert</span> <span class="n">memory_profiler</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="k">return</span> <span class="n">memory_profiler</span><span class="o">.</span><span class="n">profile</span><span class="p">(</span>
<span class="n">sub_lines</span><span class="p">,</span> <span class="n">start_line</span><span class="p">,</span> <span class="n">f</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span> <span class="c1"># type: ignore[arg-type]</span>
<span class="p">)</span>
<span class="n">func</span><span class="o">.</span><span class="n">__signature__</span> <span class="o">=</span> <span class="n">inspect</span><span class="o">.</span><span class="n">signature</span><span class="p">(</span><span class="n">f</span><span class="p">)</span> <span class="c1"># type: ignore[attr-defined]</span>
<span class="n">judf</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_create_judf</span><span class="p">(</span><span class="n">func</span><span class="p">)</span>
<span class="n">jUDFExpr</span> <span class="o">=</span> <span class="n">judf</span><span class="o">.</span><span class="n">builder</span><span class="p">(</span><span class="n">_to_seq</span><span class="p">(</span><span class="n">sc</span><span class="p">,</span> <span class="n">jexprs</span><span class="p">))</span>
<span class="n">jPythonUDF</span> <span class="o">=</span> <span class="n">judf</span><span class="o">.</span><span class="n">fromUDFExpr</span><span class="p">(</span><span class="n">jUDFExpr</span><span class="p">)</span>
<span class="nb">id</span> <span class="o">=</span> <span class="n">jUDFExpr</span><span class="o">.</span><span class="n">resultId</span><span class="p">()</span><span class="o">.</span><span class="n">id</span><span class="p">()</span>
<span class="n">sc</span><span class="o">.</span><span class="n">profiler_collector</span><span class="o">.</span><span class="n">add_profiler</span><span class="p">(</span><span class="nb">id</span><span class="p">,</span> <span class="n">memory_profiler</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">judf</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_judf</span>
<span class="n">jUDFExpr</span> <span class="o">=</span> <span class="n">judf</span><span class="o">.</span><span class="n">builder</span><span class="p">(</span><span class="n">_to_seq</span><span class="p">(</span><span class="n">sc</span><span class="p">,</span> <span class="n">jexprs</span><span class="p">))</span>
<span class="n">jPythonUDF</span> <span class="o">=</span> <span class="n">judf</span><span class="o">.</span><span class="n">fromUDFExpr</span><span class="p">(</span><span class="n">jUDFExpr</span><span class="p">)</span>
<span class="k">return</span> <span class="n">Column</span><span class="p">(</span><span class="n">jPythonUDF</span><span class="p">)</span>
<span class="c1"># This function is for improving the online help system in the interactive interpreter.</span>
<span class="c1"># For example, the built-in help / pydoc.help. It wraps the UDF with the docstring and</span>
<span class="c1"># argument annotation. (See: SPARK-19161)</span>
<span class="k">def</span> <span class="nf">_wrapped</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;UserDefinedFunctionLike&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Wrap this udf with a function and attach docstring from func</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># It is possible for a callable instance without __name__ attribute or/and</span>
<span class="c1"># __module__ attribute to be wrapped here. For example, functools.partial. In this case,</span>
<span class="c1"># we should avoid wrapping the attributes from the wrapped function to the wrapper</span>
<span class="c1"># function. So, we take out these attribute names from the default names to set and</span>
<span class="c1"># then manually assign it after being wrapped.</span>
<span class="n">assignments</span> <span class="o">=</span> <span class="nb">tuple</span><span class="p">(</span>
<span class="n">a</span> <span class="k">for</span> <span class="n">a</span> <span class="ow">in</span> <span class="n">functools</span><span class="o">.</span><span class="n">WRAPPER_ASSIGNMENTS</span> <span class="k">if</span> <span class="n">a</span> <span class="o">!=</span> <span class="s2">&quot;__name__&quot;</span> <span class="ow">and</span> <span class="n">a</span> <span class="o">!=</span> <span class="s2">&quot;__module__&quot;</span>
<span class="p">)</span>
<span class="nd">@functools</span><span class="o">.</span><span class="n">wraps</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">func</span><span class="p">,</span> <span class="n">assigned</span><span class="o">=</span><span class="n">assignments</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">wrapper</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">:</span> <span class="s2">&quot;ColumnOrName&quot;</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="s2">&quot;ColumnOrName&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Column</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="n">wrapper</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_name</span>
<span class="n">wrapper</span><span class="o">.</span><span class="vm">__module__</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">func</span><span class="o">.</span><span class="vm">__module__</span>
<span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">func</span><span class="p">,</span> <span class="s2">&quot;__module__&quot;</span><span class="p">)</span>
<span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">func</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__module__</span>
<span class="p">)</span>
<span class="n">wrapper</span><span class="o">.</span><span class="n">func</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">func</span> <span class="c1"># type: ignore[attr-defined]</span>
<span class="n">wrapper</span><span class="o">.</span><span class="n">returnType</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">returnType</span> <span class="c1"># type: ignore[attr-defined]</span>
<span class="n">wrapper</span><span class="o">.</span><span class="n">evalType</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">evalType</span> <span class="c1"># type: ignore[attr-defined]</span>
<span class="n">wrapper</span><span class="o">.</span><span class="n">deterministic</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">deterministic</span> <span class="c1"># type: ignore[attr-defined]</span>
<span class="n">wrapper</span><span class="o">.</span><span class="n">asNondeterministic</span> <span class="o">=</span> <span class="n">functools</span><span class="o">.</span><span class="n">wraps</span><span class="p">(</span> <span class="c1"># type: ignore[attr-defined]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">asNondeterministic</span>
<span class="p">)(</span><span class="k">lambda</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">asNondeterministic</span><span class="p">()</span><span class="o">.</span><span class="n">_wrapped</span><span class="p">())</span>
<span class="n">wrapper</span><span class="o">.</span><span class="n">_unwrapped</span> <span class="o">=</span> <span class="bp">self</span> <span class="c1"># type: ignore[attr-defined]</span>
<span class="k">return</span> <span class="n">wrapper</span> <span class="c1"># type: ignore[return-value]</span>
<div class="viewcode-block" id="UserDefinedFunction.asNondeterministic"><a class="viewcode-back" href="../../../reference/pyspark.sql/api/pyspark.sql.udf.UserDefinedFunction.asNondeterministic.html#pyspark.sql.UserDefinedFunction.asNondeterministic">[docs]</a> <span class="k">def</span> <span class="nf">asNondeterministic</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;UserDefinedFunction&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Updates UserDefinedFunction to nondeterministic.</span>
<span class="sd"> .. versionadded:: 2.3</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Here, we explicitly clean the cache to create a JVM UDF instance</span>
<span class="c1"># with &#39;deterministic&#39; updated. See SPARK-23233.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_judf_placeholder</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">deterministic</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">return</span> <span class="bp">self</span></div></div>
<div class="viewcode-block" id="UDFRegistration"><a class="viewcode-back" href="../../../reference/pyspark.sql/api/pyspark.sql.UDFRegistration.html#pyspark.sql.UDFRegistration">[docs]</a><span class="k">class</span> <span class="nc">UDFRegistration</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Wrapper for user-defined function registration. This instance can be accessed by</span>
<span class="sd"> :attr:`spark.udf` or :attr:`sqlContext.udf`.</span>
<span class="sd"> .. versionadded:: 1.3.1</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sparkSession</span><span class="p">:</span> <span class="s2">&quot;SparkSession&quot;</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sparkSession</span> <span class="o">=</span> <span class="n">sparkSession</span>
<div class="viewcode-block" id="UDFRegistration.register"><a class="viewcode-back" href="../../../reference/pyspark.sql/api/pyspark.sql.UDFRegistration.register.html#pyspark.sql.UDFRegistration.register">[docs]</a> <span class="k">def</span> <span class="nf">register</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">f</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">Callable</span><span class="p">[</span><span class="o">...</span><span class="p">,</span> <span class="n">Any</span><span class="p">],</span> <span class="s2">&quot;UserDefinedFunctionLike&quot;</span><span class="p">],</span>
<span class="n">returnType</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="s2">&quot;DataTypeOrString&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;UserDefinedFunctionLike&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Register a Python function (including lambda function) or a user-defined function</span>
<span class="sd"> as a SQL function.</span>
<span class="sd"> .. versionadded:: 1.3.1</span>
<span class="sd"> .. versionchanged:: 3.4.0</span>
<span class="sd"> Supports Spark Connect.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> name : str,</span>
<span class="sd"> name of the user-defined function in SQL statements.</span>
<span class="sd"> f : function, :meth:`pyspark.sql.functions.udf` or :meth:`pyspark.sql.functions.pandas_udf`</span>
<span class="sd"> a Python function, or a user-defined function. The user-defined function can</span>
<span class="sd"> be either row-at-a-time or vectorized. See :meth:`pyspark.sql.functions.udf` and</span>
<span class="sd"> :meth:`pyspark.sql.functions.pandas_udf`.</span>
<span class="sd"> returnType : :class:`pyspark.sql.types.DataType` or str, optional</span>
<span class="sd"> the return type of the registered user-defined function. The value can</span>
<span class="sd"> be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.</span>
<span class="sd"> `returnType` can be optionally specified when `f` is a Python function but not</span>
<span class="sd"> when `f` is a user-defined function. Please see the examples below.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> function</span>
<span class="sd"> a user-defined function</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> To register a nondeterministic Python function, users need to first build</span>
<span class="sd"> a nondeterministic user-defined function for the Python function and then register it</span>
<span class="sd"> as a SQL function.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> 1. When `f` is a Python function:</span>
<span class="sd"> `returnType` defaults to string type and can be optionally specified. The produced</span>
<span class="sd"> object must match the specified type. In this case, this API works as if</span>
<span class="sd"> `register(name, f, returnType=StringType())`.</span>
<span class="sd"> &gt;&gt;&gt; strlen = spark.udf.register(&quot;stringLengthString&quot;, lambda x: len(x))</span>
<span class="sd"> &gt;&gt;&gt; spark.sql(&quot;SELECT stringLengthString(&#39;test&#39;)&quot;).collect()</span>
<span class="sd"> [Row(stringLengthString(test)=&#39;4&#39;)]</span>
<span class="sd"> &gt;&gt;&gt; spark.sql(&quot;SELECT &#39;foo&#39; AS text&quot;).select(strlen(&quot;text&quot;)).collect()</span>
<span class="sd"> [Row(stringLengthString(text)=&#39;3&#39;)]</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.types import IntegerType</span>
<span class="sd"> &gt;&gt;&gt; _ = spark.udf.register(&quot;stringLengthInt&quot;, lambda x: len(x), IntegerType())</span>
<span class="sd"> &gt;&gt;&gt; spark.sql(&quot;SELECT stringLengthInt(&#39;test&#39;)&quot;).collect()</span>
<span class="sd"> [Row(stringLengthInt(test)=4)]</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.types import IntegerType</span>
<span class="sd"> &gt;&gt;&gt; _ = spark.udf.register(&quot;stringLengthInt&quot;, lambda x: len(x), IntegerType())</span>
<span class="sd"> &gt;&gt;&gt; spark.sql(&quot;SELECT stringLengthInt(&#39;test&#39;)&quot;).collect()</span>
<span class="sd"> [Row(stringLengthInt(test)=4)]</span>
<span class="sd"> 2. When `f` is a user-defined function (from Spark 2.3.0):</span>
<span class="sd"> Spark uses the return type of the given user-defined function as the return type of</span>
<span class="sd"> the registered user-defined function. `returnType` should not be specified.</span>
<span class="sd"> In this case, this API works as if `register(name, f)`.</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.types import IntegerType</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.functions import udf</span>
<span class="sd"> &gt;&gt;&gt; slen = udf(lambda s: len(s), IntegerType())</span>
<span class="sd"> &gt;&gt;&gt; _ = spark.udf.register(&quot;slen&quot;, slen)</span>
<span class="sd"> &gt;&gt;&gt; spark.sql(&quot;SELECT slen(&#39;test&#39;)&quot;).collect()</span>
<span class="sd"> [Row(slen(test)=4)]</span>
<span class="sd"> &gt;&gt;&gt; import random</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.functions import udf</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.types import IntegerType</span>
<span class="sd"> &gt;&gt;&gt; random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()</span>
<span class="sd"> &gt;&gt;&gt; new_random_udf = spark.udf.register(&quot;random_udf&quot;, random_udf)</span>
<span class="sd"> &gt;&gt;&gt; spark.sql(&quot;SELECT random_udf()&quot;).collect() # doctest: +SKIP</span>
<span class="sd"> [Row(random_udf()=82)]</span>
<span class="sd"> &gt;&gt;&gt; import pandas as pd # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.functions import pandas_udf</span>
<span class="sd"> &gt;&gt;&gt; @pandas_udf(&quot;integer&quot;) # doctest: +SKIP</span>
<span class="sd"> ... def add_one(s: pd.Series) -&gt; pd.Series:</span>
<span class="sd"> ... return s + 1</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; _ = spark.udf.register(&quot;add_one&quot;, add_one) # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; spark.sql(&quot;SELECT add_one(id) FROM range(3)&quot;).collect() # doctest: +SKIP</span>
<span class="sd"> [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]</span>
<span class="sd"> &gt;&gt;&gt; @pandas_udf(&quot;integer&quot;) # doctest: +SKIP</span>
<span class="sd"> ... def sum_udf(v: pd.Series) -&gt; int:</span>
<span class="sd"> ... return v.sum()</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; _ = spark.udf.register(&quot;sum_udf&quot;, sum_udf) # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; q = &quot;SELECT sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2&quot;</span>
<span class="sd"> &gt;&gt;&gt; spark.sql(q).collect() # doctest: +SKIP</span>
<span class="sd"> [Row(sum_udf(v1)=1), Row(sum_udf(v1)=5)]</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># This is to check whether the input function is from a user-defined function or</span>
<span class="c1"># Python function.</span>
<span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="s2">&quot;asNondeterministic&quot;</span><span class="p">):</span>
<span class="k">if</span> <span class="n">returnType</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkTypeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;CANNOT_SPECIFY_RETURN_TYPE_FOR_UDF&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;arg_name&quot;</span><span class="p">:</span> <span class="s2">&quot;f&quot;</span><span class="p">,</span> <span class="s2">&quot;return_type&quot;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">returnType</span><span class="p">)},</span>
<span class="p">)</span>
<span class="n">f</span> <span class="o">=</span> <span class="n">cast</span><span class="p">(</span><span class="s2">&quot;UserDefinedFunctionLike&quot;</span><span class="p">,</span> <span class="n">f</span><span class="p">)</span>
<span class="k">if</span> <span class="n">f</span><span class="o">.</span><span class="n">evalType</span> <span class="ow">not</span> <span class="ow">in</span> <span class="p">[</span>
<span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_BATCHED_UDF</span><span class="p">,</span>
<span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_ARROW_BATCHED_UDF</span><span class="p">,</span>
<span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_SCALAR_PANDAS_UDF</span><span class="p">,</span>
<span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_SCALAR_PANDAS_ITER_UDF</span><span class="p">,</span>
<span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_GROUPED_AGG_PANDAS_UDF</span><span class="p">,</span>
<span class="p">]:</span>
<span class="k">raise</span> <span class="n">PySparkTypeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_UDF_EVAL_TYPE&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;eval_type&quot;</span><span class="p">:</span> <span class="s2">&quot;SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, &quot;</span>
<span class="s2">&quot;SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_PANDAS_ITER_UDF or &quot;</span>
<span class="s2">&quot;SQL_GROUPED_AGG_PANDAS_UDF&quot;</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="n">source_udf</span> <span class="o">=</span> <span class="n">_create_udf</span><span class="p">(</span>
<span class="n">f</span><span class="o">.</span><span class="n">func</span><span class="p">,</span>
<span class="n">returnType</span><span class="o">=</span><span class="n">f</span><span class="o">.</span><span class="n">returnType</span><span class="p">,</span>
<span class="n">name</span><span class="o">=</span><span class="n">name</span><span class="p">,</span>
<span class="n">evalType</span><span class="o">=</span><span class="n">f</span><span class="o">.</span><span class="n">evalType</span><span class="p">,</span>
<span class="n">deterministic</span><span class="o">=</span><span class="n">f</span><span class="o">.</span><span class="n">deterministic</span><span class="p">,</span>
<span class="p">)</span>
<span class="n">register_udf</span> <span class="o">=</span> <span class="n">source_udf</span><span class="o">.</span><span class="n">_unwrapped</span> <span class="c1"># type: ignore[attr-defined]</span>
<span class="n">return_udf</span> <span class="o">=</span> <span class="n">register_udf</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">if</span> <span class="n">returnType</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">returnType</span> <span class="o">=</span> <span class="n">StringType</span><span class="p">()</span>
<span class="n">return_udf</span> <span class="o">=</span> <span class="n">_create_udf</span><span class="p">(</span>
<span class="n">f</span><span class="p">,</span> <span class="n">returnType</span><span class="o">=</span><span class="n">returnType</span><span class="p">,</span> <span class="n">evalType</span><span class="o">=</span><span class="n">PythonEvalType</span><span class="o">.</span><span class="n">SQL_BATCHED_UDF</span><span class="p">,</span> <span class="n">name</span><span class="o">=</span><span class="n">name</span>
<span class="p">)</span>
<span class="n">register_udf</span> <span class="o">=</span> <span class="n">return_udf</span><span class="o">.</span><span class="n">_unwrapped</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sparkSession</span><span class="o">.</span><span class="n">_jsparkSession</span><span class="o">.</span><span class="n">udf</span><span class="p">()</span><span class="o">.</span><span class="n">registerPython</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">register_udf</span><span class="o">.</span><span class="n">_judf</span><span class="p">)</span>
<span class="k">return</span> <span class="n">return_udf</span></div>
<div class="viewcode-block" id="UDFRegistration.registerJavaFunction"><a class="viewcode-back" href="../../../reference/pyspark.sql/api/pyspark.sql.UDFRegistration.registerJavaFunction.html#pyspark.sql.UDFRegistration.registerJavaFunction">[docs]</a> <span class="k">def</span> <span class="nf">registerJavaFunction</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">javaClassName</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">returnType</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="s2">&quot;DataTypeOrString&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Register a Java user-defined function as a SQL function.</span>
<span class="sd"> In addition to a name and the function itself, the return type can be optionally specified.</span>
<span class="sd"> When the return type is not specified we would infer it via reflection.</span>
<span class="sd"> .. versionadded:: 2.3.0</span>
<span class="sd"> .. versionchanged:: 3.4.0</span>
<span class="sd"> Supports Spark Connect.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> name : str</span>
<span class="sd"> name of the user-defined function</span>
<span class="sd"> javaClassName : str</span>
<span class="sd"> fully qualified name of java class</span>
<span class="sd"> returnType : :class:`pyspark.sql.types.DataType` or str, optional</span>
<span class="sd"> the return type of the registered Java function. The value can be either</span>
<span class="sd"> a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.types import IntegerType</span>
<span class="sd"> &gt;&gt;&gt; spark.udf.registerJavaFunction(</span>
<span class="sd"> ... &quot;javaStringLength&quot;, &quot;test.org.apache.spark.sql.JavaStringLength&quot;, IntegerType())</span>
<span class="sd"> ... # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; spark.sql(&quot;SELECT javaStringLength(&#39;test&#39;)&quot;).collect() # doctest: +SKIP</span>
<span class="sd"> [Row(javaStringLength(test)=4)]</span>
<span class="sd"> &gt;&gt;&gt; spark.udf.registerJavaFunction(</span>
<span class="sd"> ... &quot;javaStringLength2&quot;, &quot;test.org.apache.spark.sql.JavaStringLength&quot;)</span>
<span class="sd"> ... # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; spark.sql(&quot;SELECT javaStringLength2(&#39;test&#39;)&quot;).collect() # doctest: +SKIP</span>
<span class="sd"> [Row(javaStringLength2(test)=4)]</span>
<span class="sd"> &gt;&gt;&gt; spark.udf.registerJavaFunction(</span>
<span class="sd"> ... &quot;javaStringLength3&quot;, &quot;test.org.apache.spark.sql.JavaStringLength&quot;, &quot;integer&quot;)</span>
<span class="sd"> ... # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; spark.sql(&quot;SELECT javaStringLength3(&#39;test&#39;)&quot;).collect() # doctest: +SKIP</span>
<span class="sd"> [Row(javaStringLength3(test)=4)]</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">jdt</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">returnType</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">returnType</span><span class="p">,</span> <span class="n">DataType</span><span class="p">):</span>
<span class="n">returnType</span> <span class="o">=</span> <span class="n">_parse_datatype_string</span><span class="p">(</span><span class="n">returnType</span><span class="p">)</span>
<span class="n">jdt</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sparkSession</span><span class="o">.</span><span class="n">_jsparkSession</span><span class="o">.</span><span class="n">parseDataType</span><span class="p">(</span><span class="n">returnType</span><span class="o">.</span><span class="n">json</span><span class="p">())</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sparkSession</span><span class="o">.</span><span class="n">_jsparkSession</span><span class="o">.</span><span class="n">udf</span><span class="p">()</span><span class="o">.</span><span class="n">registerJava</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">javaClassName</span><span class="p">,</span> <span class="n">jdt</span><span class="p">)</span></div>
<div class="viewcode-block" id="UDFRegistration.registerJavaUDAF"><a class="viewcode-back" href="../../../reference/pyspark.sql/api/pyspark.sql.UDFRegistration.registerJavaUDAF.html#pyspark.sql.UDFRegistration.registerJavaUDAF">[docs]</a> <span class="k">def</span> <span class="nf">registerJavaUDAF</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">javaClassName</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Register a Java user-defined aggregate function as a SQL function.</span>
<span class="sd"> .. versionadded:: 2.3.0</span>
<span class="sd"> .. versionchanged:: 3.4.0</span>
<span class="sd"> Supports Spark Connect.</span>
<span class="sd"> name : str</span>
<span class="sd"> name of the user-defined aggregate function</span>
<span class="sd"> javaClassName : str</span>
<span class="sd"> fully qualified name of java class</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; spark.udf.registerJavaUDAF(&quot;javaUDAF&quot;, &quot;test.org.apache.spark.sql.MyDoubleAvg&quot;)</span>
<span class="sd"> ... # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; df = spark.createDataFrame([(1, &quot;a&quot;),(2, &quot;b&quot;), (3, &quot;a&quot;)],[&quot;id&quot;, &quot;name&quot;])</span>
<span class="sd"> &gt;&gt;&gt; df.createOrReplaceTempView(&quot;df&quot;)</span>
<span class="sd"> &gt;&gt;&gt; q = &quot;SELECT name, javaUDAF(id) as avg from df group by name order by name desc&quot;</span>
<span class="sd"> &gt;&gt;&gt; spark.sql(q).collect() # doctest: +SKIP</span>
<span class="sd"> [Row(name=&#39;b&#39;, avg=102.0), Row(name=&#39;a&#39;, avg=102.0)]</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sparkSession</span><span class="o">.</span><span class="n">_jsparkSession</span><span class="o">.</span><span class="n">udf</span><span class="p">()</span><span class="o">.</span><span class="n">registerJavaUDAF</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">javaClassName</span><span class="p">)</span></div></div>
<span class="k">def</span> <span class="nf">_test</span><span class="p">()</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">doctest</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span>
<span class="kn">import</span> <span class="nn">pyspark.sql.udf</span>
<span class="n">globs</span> <span class="o">=</span> <span class="n">pyspark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">udf</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span><span class="o">.</span><span class="n">builder</span><span class="o">.</span><span class="n">master</span><span class="p">(</span><span class="s2">&quot;local[4]&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">appName</span><span class="p">(</span><span class="s2">&quot;sql.udf tests&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;spark&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">spark</span>
<span class="p">(</span><span class="n">failure_count</span><span class="p">,</span> <span class="n">test_count</span><span class="p">)</span> <span class="o">=</span> <span class="n">doctest</span><span class="o">.</span><span class="n">testmod</span><span class="p">(</span>
<span class="n">pyspark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">udf</span><span class="p">,</span> <span class="n">globs</span><span class="o">=</span><span class="n">globs</span><span class="p">,</span> <span class="n">optionflags</span><span class="o">=</span><span class="n">doctest</span><span class="o">.</span><span class="n">ELLIPSIS</span> <span class="o">|</span> <span class="n">doctest</span><span class="o">.</span><span class="n">NORMALIZE_WHITESPACE</span>
<span class="p">)</span>
<span class="n">spark</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">if</span> <span class="n">failure_count</span><span class="p">:</span>
<span class="n">sys</span><span class="o">.</span><span class="n">exit</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">)</span>
<span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">&quot;__main__&quot;</span><span class="p">:</span>
<span class="n">_test</span><span class="p">()</span>
</pre></div>
</article>
<footer class="bd-footer-article">
<div class="footer-article-items footer-article__inner">
<div class="footer-article-item"><!-- Previous / next buttons -->
<div class="prev-next-area">
</div></div>
</div>
</footer>
</div>
</div>
<footer class="bd-footer-content">
</footer>
</main>
</div>
</div>
<!-- Scripts loaded after <body> so the DOM is not blocked -->
<script src="../../../_static/scripts/bootstrap.js?digest=e353d410970836974a52"></script>
<script src="../../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52"></script>
<footer class="bd-footer">
<div class="bd-footer__inner bd-page-width">
<div class="footer-items__start">
<div class="footer-item"><p class="copyright">
Copyright @ 2024 The Apache Software Foundation, Licensed under the <a href="https://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
</p></div>
<div class="footer-item">
<p class="sphinx-version">
Created using <a href="https://www.sphinx-doc.org/">Sphinx</a> 4.5.0.
<br/>
</p>
</div>
</div>
<div class="footer-items__end">
<div class="footer-item"><p class="theme-version">
Built with the <a href="https://pydata-sphinx-theme.readthedocs.io/en/stable/index.html">PyData Sphinx Theme</a> 0.13.3.
</p></div>
</div>
</div>
</footer>
</body>
</html>