blob: 6f1cd4e2e30f386ff18a60bd717d702605435add [file] [log] [blame]
<!DOCTYPE html>
<html >
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /><meta name="generator" content="Docutils 0.17.1: http://docutils.sourceforge.net/" />
<title>Apache Arrow in PySpark &#8212; PySpark 4.0.0-preview1 documentation</title>
<script data-cfasync="false">
document.documentElement.dataset.mode = localStorage.getItem("mode") || "";
document.documentElement.dataset.theme = localStorage.getItem("theme") || "light";
</script>
<!-- Loaded before other Sphinx assets -->
<link href="../../_static/styles/theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../_static/styles/bootstrap.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../_static/styles/pydata-sphinx-theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../_static/vendor/fontawesome/6.1.2/css/all.min.css?digest=e353d410970836974a52" rel="stylesheet" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../_static/vendor/fontawesome/6.1.2/webfonts/fa-solid-900.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../_static/vendor/fontawesome/6.1.2/webfonts/fa-brands-400.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../_static/vendor/fontawesome/6.1.2/webfonts/fa-regular-400.woff2" />
<link rel="stylesheet" type="text/css" href="../../_static/pygments.css" />
<link rel="stylesheet" type="text/css" href="../../_static/copybutton.css" />
<link rel="stylesheet" type="text/css" href="../../_static/css/pyspark.css" />
<!-- Pre-loaded scripts that we'll load fully later -->
<link rel="preload" as="script" href="../../_static/scripts/bootstrap.js?digest=e353d410970836974a52" />
<link rel="preload" as="script" href="../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52" />
<script data-url_root="../../" id="documentation_options" src="../../_static/documentation_options.js"></script>
<script src="../../_static/jquery.js"></script>
<script src="../../_static/underscore.js"></script>
<script src="../../_static/doctools.js"></script>
<script src="../../_static/clipboard.min.js"></script>
<script src="../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script>DOCUMENTATION_OPTIONS.pagename = 'user_guide/sql/arrow_pandas';</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html" />
<link rel="search" title="Search" href="../../search.html" />
<link rel="next" title="Python User-defined Table Functions (UDTFs)" href="python_udtf.html" />
<link rel="prev" title="Spark SQL" href="index.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="None">
<!-- Matomo -->
<script type="text/javascript">
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body data-bs-spy="scroll" data-bs-target=".bd-toc-nav" data-offset="180" data-bs-root-margin="0px 0px -60%" data-default-mode="">
<a class="skip-link" href="#main-content">Skip to main content</a>
<input type="checkbox"
class="sidebar-toggle"
name="__primary"
id="__primary"/>
<label class="overlay overlay-primary" for="__primary"></label>
<input type="checkbox"
class="sidebar-toggle"
name="__secondary"
id="__secondary"/>
<label class="overlay overlay-secondary" for="__secondary"></label>
<div class="search-button__wrapper">
<div class="search-button__overlay"></div>
<div class="search-button__search-container">
<form class="bd-search d-flex align-items-center"
action="../../search.html"
method="get">
<i class="fa-solid fa-magnifying-glass"></i>
<input type="search"
class="form-control"
name="q"
id="search-input"
placeholder="Search the docs ..."
aria-label="Search the docs ..."
autocomplete="off"
autocorrect="off"
autocapitalize="off"
spellcheck="false"/>
<span class="search-button__kbd-shortcut"><kbd class="kbd-shortcut__modifier">Ctrl</kbd>+<kbd>K</kbd></span>
</form></div>
</div>
<nav class="bd-header navbar navbar-expand-lg bd-navbar">
<div class="bd-header__inner bd-page-width">
<label class="sidebar-toggle primary-toggle" for="__primary">
<span class="fa-solid fa-bars"></span>
</label>
<div class="navbar-header-items__start">
<div class="navbar-item">
<a class="navbar-brand logo" href="../../index.html">
<img src="../../_static/spark-logo-light.png" class="logo__image only-light" alt="Logo image"/>
<script>document.write(`<img src="../../_static/spark-logo-dark.png" class="logo__image only-dark" alt="Logo image"/>`);</script>
</a></div>
</div>
<div class="col-lg-9 navbar-header-items">
<div class="me-auto navbar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item current active">
<a class="nav-link nav-internal" href="../index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="navbar-header-items__end">
<div class="navbar-item navbar-persistent--container">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview1
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "user_guide/sql/arrow_pandas.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="navbar-persistent--mobile">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
<label class="sidebar-toggle secondary-toggle" for="__secondary">
<span class="fa-solid fa-outdent"></span>
</label>
</div>
</nav>
<div class="bd-container">
<div class="bd-container__inner bd-page-width">
<div class="bd-sidebar-primary bd-sidebar">
<div class="sidebar-header-items sidebar-primary__section">
<div class="sidebar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item current active">
<a class="nav-link nav-internal" href="../index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="sidebar-header-items__end">
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview1
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "user_guide/sql/arrow_pandas.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="sidebar-primary-items__start sidebar-primary__section">
<div class="sidebar-primary-item"><nav class="bd-docs-nav bd-links"
aria-label="Section Navigation">
<p class="bd-links__title" role="heading" aria-level="1">Section Navigation</p>
<div class="bd-toc-item navbar-nav"><ul class="current nav bd-sidenav">
<li class="toctree-l1"><a class="reference internal" href="../python_packaging.html">Python Package Management</a></li>
<li class="toctree-l1 current active has-children"><a class="reference internal" href="index.html">Spark SQL</a><input checked="" class="toctree-checkbox" id="toctree-checkbox-1" name="toctree-checkbox-1" type="checkbox"/><label class="toctree-toggle" for="toctree-checkbox-1"><i class="fa-solid fa-chevron-down"></i></label><ul class="current">
<li class="toctree-l2 current active"><a class="current reference internal" href="#">Apache Arrow in PySpark</a></li>
<li class="toctree-l2"><a class="reference internal" href="python_udtf.html">Python User-defined Table Functions (UDTFs)</a></li>
<li class="toctree-l2"><a class="reference internal" href="python_data_source.html">Python Data Source API</a></li>
<li class="toctree-l2"><a class="reference internal" href="type_conversions.html">Python to Spark Type Conversions</a></li>
</ul>
</li>
<li class="toctree-l1 has-children"><a class="reference internal" href="../pandas_on_spark/index.html">Pandas API on Spark</a><input class="toctree-checkbox" id="toctree-checkbox-2" name="toctree-checkbox-2" type="checkbox"/><label class="toctree-toggle" for="toctree-checkbox-2"><i class="fa-solid fa-chevron-down"></i></label><ul>
<li class="toctree-l2"><a class="reference internal" href="../pandas_on_spark/options.html">Options and settings</a></li>
<li class="toctree-l2"><a class="reference internal" href="../pandas_on_spark/pandas_pyspark.html">From/to pandas and PySpark DataFrames</a></li>
<li class="toctree-l2"><a class="reference internal" href="../pandas_on_spark/transform_apply.html">Transform and apply a function</a></li>
<li class="toctree-l2"><a class="reference internal" href="../pandas_on_spark/types.html">Type Support in Pandas API on Spark</a></li>
<li class="toctree-l2"><a class="reference internal" href="../pandas_on_spark/typehints.html">Type Hints in Pandas API on Spark</a></li>
<li class="toctree-l2"><a class="reference internal" href="../pandas_on_spark/from_to_dbms.html">From/to other DBMSes</a></li>
<li class="toctree-l2"><a class="reference internal" href="../pandas_on_spark/best_practices.html">Best Practices</a></li>
<li class="toctree-l2"><a class="reference internal" href="../pandas_on_spark/supported_pandas_api.html">Supported pandas API</a></li>
<li class="toctree-l2"><a class="reference internal" href="../pandas_on_spark/faq.html">FAQ</a></li>
</ul>
</li>
</ul>
</div>
</nav></div>
</div>
<div class="sidebar-primary-items__end sidebar-primary__section">
</div>
<div id="rtd-footer-container"></div>
</div>
<main id="main-content" class="bd-main">
<div class="bd-content">
<div class="bd-article-container">
<div class="bd-header-article">
<div class="header-article-items header-article__inner">
<div class="header-article-items__start">
<div class="header-article-item">
<nav aria-label="Breadcrumbs">
<ul class="bd-breadcrumbs" role="navigation" aria-label="Breadcrumb">
<li class="breadcrumb-item breadcrumb-home">
<a href="../../index.html" class="nav-link" aria-label="Home">
<i class="fa-solid fa-home"></i>
</a>
</li>
<li class="breadcrumb-item"><a href="../index.html" class="nav-link">User Guides</a></li>
<li class="breadcrumb-item"><a href="index.html" class="nav-link">Spark SQL</a></li>
<li class="breadcrumb-item active" aria-current="page">Apache Arrow in PySpark</li>
</ul>
</nav>
</div>
</div>
</div>
</div>
<div id="searchbox"></div>
<article class="bd-article" role="main">
<section id="apache-arrow-in-pyspark">
<h1>Apache Arrow in PySpark<a class="headerlink" href="#apache-arrow-in-pyspark" title="Permalink to this headline">#</a></h1>
<p>Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer
data between JVM and Python processes. This currently is most beneficial to Python users that
work with Pandas/NumPy data. Its usage is not automatic and might require some minor
changes to configuration or code to take full advantage and ensure compatibility. This guide will
give a high-level description of how to use Arrow in Spark and highlight any differences when
working with Arrow-enabled data.</p>
<section id="ensure-pyarrow-installed">
<h2>Ensure PyArrow Installed<a class="headerlink" href="#ensure-pyarrow-installed" title="Permalink to this headline">#</a></h2>
<p>To use Apache Arrow in PySpark, <a class="reference internal" href="#recommended-pandas-and-pyarrow-versions"><span class="std std-ref">the recommended version of PyArrow</span></a>
should be installed.
If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the
SQL module with the command <code class="docutils literal notranslate"><span class="pre">pip</span> <span class="pre">install</span> <span class="pre">pyspark[sql]</span></code>. Otherwise, you must ensure that PyArrow
is installed and available on all cluster nodes.
You can install it using pip or conda from the conda-forge channel. See PyArrow
<a class="reference external" href="https://arrow.apache.org/docs/python/install.html">installation</a> for details.</p>
</section>
<section id="conversion-to-arrow-table">
<h2>Conversion to Arrow Table<a class="headerlink" href="#conversion-to-arrow-table" title="Permalink to this headline">#</a></h2>
<p>You can call <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.toArrow.html#pyspark.sql.DataFrame.toArrow" title="pyspark.sql.DataFrame.toArrow"><code class="xref py py-meth docutils literal notranslate"><span class="pre">DataFrame.toArrow()</span></code></a> to convert a Spark DataFrame to a PyArrow Table.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">pyarrow</span> <span class="k">as</span> <span class="nn">pa</span> <span class="c1"># noqa: F401</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">rand</span>
<span class="c1"># Create a Spark DataFrame</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">range</span><span class="p">(</span><span class="mi">100</span><span class="p">)</span><span class="o">.</span><span class="n">drop</span><span class="p">(</span><span class="s2">&quot;id&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">withColumns</span><span class="p">({</span><span class="s2">&quot;0&quot;</span><span class="p">:</span> <span class="n">rand</span><span class="p">(),</span> <span class="s2">&quot;1&quot;</span><span class="p">:</span> <span class="n">rand</span><span class="p">(),</span> <span class="s2">&quot;2&quot;</span><span class="p">:</span> <span class="n">rand</span><span class="p">()})</span>
<span class="c1"># Convert the Spark DataFrame to a PyArrow Table</span>
<span class="n">table</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s2">&quot;*&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">toArrow</span><span class="p">()</span>
<span class="nb">print</span><span class="p">(</span><span class="n">table</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span>
<span class="c1"># 0: double not null</span>
<span class="c1"># 1: double not null</span>
<span class="c1"># 2: double not null</span>
</pre></div>
</div>
<p>Note that <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.toArrow.html#pyspark.sql.DataFrame.toArrow" title="pyspark.sql.DataFrame.toArrow"><code class="xref py py-meth docutils literal notranslate"><span class="pre">DataFrame.toArrow()</span></code></a> results in the collection of all records in the DataFrame to
the driver program and should be done on a small subset of the data. Not all Spark data types are
currently supported and an error can be raised if a column has an unsupported type.</p>
</section>
<section id="enabling-for-conversion-to-from-pandas">
<h2>Enabling for Conversion to/from Pandas<a class="headerlink" href="#enabling-for-conversion-to-from-pandas" title="Permalink to this headline">#</a></h2>
<p>Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame
using the call <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.toPandas.html#pyspark.sql.DataFrame.toPandas" title="pyspark.sql.DataFrame.toPandas"><code class="xref py py-meth docutils literal notranslate"><span class="pre">DataFrame.toPandas()</span></code></a> and when creating a Spark DataFrame from a Pandas DataFrame with
<a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.SparkSession.createDataFrame.html#pyspark.sql.SparkSession.createDataFrame" title="pyspark.sql.SparkSession.createDataFrame"><code class="xref py py-meth docutils literal notranslate"><span class="pre">SparkSession.createDataFrame()</span></code></a>. To use Arrow when executing these calls, users need to first set
the Spark configuration <code class="docutils literal notranslate"><span class="pre">spark.sql.execution.arrow.pyspark.enabled</span></code> to <code class="docutils literal notranslate"><span class="pre">true</span></code>. This is disabled by default.</p>
<p>In addition, optimizations enabled by <code class="docutils literal notranslate"><span class="pre">spark.sql.execution.arrow.pyspark.enabled</span></code> could fallback automatically
to non-Arrow optimization implementation if an error occurs before the actual computation within Spark.
This can be controlled by <code class="docutils literal notranslate"><span class="pre">spark.sql.execution.arrow.pyspark.fallback.enabled</span></code>.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">numpy</span> <span class="k">as</span> <span class="nn">np</span>
<span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span>
<span class="c1"># Enable Arrow-based columnar data transfers</span>
<span class="n">spark</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">set</span><span class="p">(</span><span class="s2">&quot;spark.sql.execution.arrow.pyspark.enabled&quot;</span><span class="p">,</span> <span class="s2">&quot;true&quot;</span><span class="p">)</span>
<span class="c1"># Generate a Pandas DataFrame</span>
<span class="n">pdf</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">(</span><span class="n">np</span><span class="o">.</span><span class="n">random</span><span class="o">.</span><span class="n">rand</span><span class="p">(</span><span class="mi">100</span><span class="p">,</span> <span class="mi">3</span><span class="p">))</span>
<span class="c1"># Create a Spark DataFrame from a Pandas DataFrame using Arrow</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">pdf</span><span class="p">)</span>
<span class="c1"># Convert the Spark DataFrame back to a Pandas DataFrame using Arrow</span>
<span class="n">result_pdf</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s2">&quot;*&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">toPandas</span><span class="p">()</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">&quot;Pandas DataFrame result statistics:</span><span class="se">\n</span><span class="si">%s</span><span class="se">\n</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="nb">str</span><span class="p">(</span><span class="n">result_pdf</span><span class="o">.</span><span class="n">describe</span><span class="p">()))</span>
</pre></div>
</div>
<p>Using the above optimizations with Arrow will produce the same results as when Arrow is not
enabled.</p>
<p>Note that even with Arrow, <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.toPandas.html#pyspark.sql.DataFrame.toPandas" title="pyspark.sql.DataFrame.toPandas"><code class="xref py py-meth docutils literal notranslate"><span class="pre">DataFrame.toPandas()</span></code></a> results in the collection of all records in the
DataFrame to the driver program and should be done on a small subset of the data. Not all Spark
data types are currently supported and an error can be raised if a column has an unsupported type.
If an error occurs during <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.SparkSession.createDataFrame.html#pyspark.sql.SparkSession.createDataFrame" title="pyspark.sql.SparkSession.createDataFrame"><code class="xref py py-meth docutils literal notranslate"><span class="pre">SparkSession.createDataFrame()</span></code></a>, Spark will fall back to create the
DataFrame without Arrow.</p>
</section>
<section id="pandas-udfs-a-k-a-vectorized-udfs">
<h2>Pandas UDFs (a.k.a. Vectorized UDFs)<a class="headerlink" href="#pandas-udfs-a-k-a-vectorized-udfs" title="Permalink to this headline">#</a></h2>
<p>Pandas UDFs are user defined functions that are executed by Spark using
Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas
UDF is defined using the <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html#pyspark.sql.functions.pandas_udf" title="pyspark.sql.functions.pandas_udf"><code class="xref py py-meth docutils literal notranslate"><span class="pre">pandas_udf()</span></code></a> as a decorator or to wrap the function, and no additional
configuration is required. A Pandas UDF behaves as a regular PySpark function API in general.</p>
<p>Before Spark 3.0, Pandas UDFs used to be defined with <code class="docutils literal notranslate"><span class="pre">pyspark.sql.functions.PandasUDFType</span></code>. From Spark 3.0
with Python 3.6+, you can also use <a class="reference external" href="https://www.python.org/dev/peps/pep-0484">Python type hints</a>.
Using Python type hints is preferred and using <code class="docutils literal notranslate"><span class="pre">pyspark.sql.functions.PandasUDFType</span></code> will be deprecated in
the future release.</p>
<p>Note that the type hint should use <code class="docutils literal notranslate"><span class="pre">pandas.Series</span></code> in all cases but there is one variant
that <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code> should be used for its input or output type hint instead when the input
or output column is of <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.types.StructType.html#pyspark.sql.types.StructType" title="pyspark.sql.types.StructType"><code class="xref py py-class docutils literal notranslate"><span class="pre">StructType</span></code></a>. The following example shows a Pandas UDF which takes long
column, string column and struct column, and outputs a struct column. It requires the function to
specify the type hints of <code class="docutils literal notranslate"><span class="pre">pandas.Series</span></code> and <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code> as below:</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">pandas_udf</span>
<span class="nd">@pandas_udf</span><span class="p">(</span><span class="s2">&quot;col1 string, col2 long&quot;</span><span class="p">)</span> <span class="c1"># type: ignore[call-overload]</span>
<span class="k">def</span> <span class="nf">func</span><span class="p">(</span><span class="n">s1</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">,</span> <span class="n">s2</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">,</span> <span class="n">s3</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">:</span>
<span class="n">s3</span><span class="p">[</span><span class="s1">&#39;col2&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">s1</span> <span class="o">+</span> <span class="n">s2</span><span class="o">.</span><span class="n">str</span><span class="o">.</span><span class="n">len</span><span class="p">()</span>
<span class="k">return</span> <span class="n">s3</span>
<span class="c1"># Create a Spark DataFrame that has three columns including a struct column.</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span>
<span class="p">[[</span><span class="mi">1</span><span class="p">,</span> <span class="s2">&quot;a string&quot;</span><span class="p">,</span> <span class="p">(</span><span class="s2">&quot;a nested string&quot;</span><span class="p">,)]],</span>
<span class="s2">&quot;long_col long, string_col string, struct_col struct&lt;col1:string&gt;&quot;</span><span class="p">)</span>
<span class="n">df</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span>
<span class="c1"># root</span>
<span class="c1"># |-- long_column: long (nullable = true)</span>
<span class="c1"># |-- string_column: string (nullable = true)</span>
<span class="c1"># |-- struct_column: struct (nullable = true)</span>
<span class="c1"># | |-- col1: string (nullable = true)</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">func</span><span class="p">(</span><span class="s2">&quot;long_col&quot;</span><span class="p">,</span> <span class="s2">&quot;string_col&quot;</span><span class="p">,</span> <span class="s2">&quot;struct_col&quot;</span><span class="p">))</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span>
<span class="c1"># |-- func(long_col, string_col, struct_col): struct (nullable = true)</span>
<span class="c1"># | |-- col1: string (nullable = true)</span>
<span class="c1"># | |-- col2: long (nullable = true)</span>
</pre></div>
</div>
<p>In the following sections, it describes the combinations of the supported type hints. For simplicity,
<code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code> variant is omitted.</p>
<section id="series-to-series">
<h3>Series to Series<a class="headerlink" href="#series-to-series" title="Permalink to this headline">#</a></h3>
<p>The type hint can be expressed as <code class="docutils literal notranslate"><span class="pre">pandas.Series</span></code>, … -&gt; <code class="docutils literal notranslate"><span class="pre">pandas.Series</span></code>.</p>
<p>By using <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html#pyspark.sql.functions.pandas_udf" title="pyspark.sql.functions.pandas_udf"><code class="xref py py-func docutils literal notranslate"><span class="pre">pandas_udf()</span></code></a> with the function having such type hints above, it creates a Pandas UDF where the given
function takes one or more <code class="docutils literal notranslate"><span class="pre">pandas.Series</span></code> and outputs one <code class="docutils literal notranslate"><span class="pre">pandas.Series</span></code>. The output of the function should
always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting
columns into batches and calling the function for each batch as a subset of the data, then concatenating
the results together.</p>
<p>The following example shows how to create this Pandas UDF that computes the product of 2 columns.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">col</span><span class="p">,</span> <span class="n">pandas_udf</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="n">LongType</span>
<span class="c1"># Declare the function and create the UDF</span>
<span class="k">def</span> <span class="nf">multiply_func</span><span class="p">(</span><span class="n">a</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">:</span>
<span class="k">return</span> <span class="n">a</span> <span class="o">*</span> <span class="n">b</span>
<span class="n">multiply</span> <span class="o">=</span> <span class="n">pandas_udf</span><span class="p">(</span><span class="n">multiply_func</span><span class="p">,</span> <span class="n">returnType</span><span class="o">=</span><span class="n">LongType</span><span class="p">())</span> <span class="c1"># type: ignore[call-overload]</span>
<span class="c1"># The function for a pandas_udf should be able to execute with local Pandas data</span>
<span class="n">x</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">([</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">])</span>
<span class="nb">print</span><span class="p">(</span><span class="n">multiply_func</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">x</span><span class="p">))</span>
<span class="c1"># 0 1</span>
<span class="c1"># 1 4</span>
<span class="c1"># 2 9</span>
<span class="c1"># dtype: int64</span>
<span class="c1"># Create a Spark DataFrame, &#39;spark&#39; is an existing SparkSession</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s2">&quot;x&quot;</span><span class="p">]))</span>
<span class="c1"># Execute function as a Spark vectorized UDF</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">multiply</span><span class="p">(</span><span class="n">col</span><span class="p">(</span><span class="s2">&quot;x&quot;</span><span class="p">),</span> <span class="n">col</span><span class="p">(</span><span class="s2">&quot;x&quot;</span><span class="p">)))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +-------------------+</span>
<span class="c1"># |multiply_func(x, x)|</span>
<span class="c1"># +-------------------+</span>
<span class="c1"># | 1|</span>
<span class="c1"># | 4|</span>
<span class="c1"># | 9|</span>
<span class="c1"># +-------------------+</span>
</pre></div>
</div>
<p>For detailed usage, please see <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html#pyspark.sql.functions.pandas_udf" title="pyspark.sql.functions.pandas_udf"><code class="xref py py-func docutils literal notranslate"><span class="pre">pandas_udf()</span></code></a>.</p>
</section>
<section id="iterator-of-series-to-iterator-of-series">
<h3>Iterator of Series to Iterator of Series<a class="headerlink" href="#iterator-of-series-to-iterator-of-series" title="Permalink to this headline">#</a></h3>
<p>The type hint can be expressed as <code class="docutils literal notranslate"><span class="pre">Iterator[pandas.Series]</span></code> -&gt; <code class="docutils literal notranslate"><span class="pre">Iterator[pandas.Series]</span></code>.</p>
<p>By using <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html#pyspark.sql.functions.pandas_udf" title="pyspark.sql.functions.pandas_udf"><code class="xref py py-func docutils literal notranslate"><span class="pre">pandas_udf()</span></code></a> with the function having such type hints above, it creates a Pandas UDF where the given
function takes an iterator of <code class="docutils literal notranslate"><span class="pre">pandas.Series</span></code> and outputs an iterator of <code class="docutils literal notranslate"><span class="pre">pandas.Series</span></code>. The
length of the entire output from the function should be the same length of the entire input; therefore, it can
prefetch the data from the input iterator as long as the lengths are the same.
In this case, the created Pandas UDF requires one input column when the Pandas UDF is called. To use
multiple input columns, a different type hint is required. See Iterator of Multiple Series to Iterator
of Series.</p>
<p>It is also useful when the UDF execution requires initializing some states although internally it works
identically as Series to Series case. The pseudocode below illustrates the example.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="nd">@pandas_udf</span><span class="p">(</span><span class="s2">&quot;long&quot;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">calculate</span><span class="p">(</span><span class="n">iterator</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">]:</span>
<span class="c1"># Do some expensive initialization with a state</span>
<span class="n">state</span> <span class="o">=</span> <span class="n">very_expensive_initialization</span><span class="p">()</span>
<span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span>
<span class="c1"># Use that state for the whole iterator.</span>
<span class="k">yield</span> <span class="n">calculate_with_state</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">state</span><span class="p">)</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">calculate</span><span class="p">(</span><span class="s2">&quot;value&quot;</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
</pre></div>
</div>
<p>The following example shows how to create this Pandas UDF:</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Iterator</span>
<span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">pandas_udf</span>
<span class="n">pdf</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">([</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">],</span> <span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s2">&quot;x&quot;</span><span class="p">])</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">pdf</span><span class="p">)</span>
<span class="c1"># Declare the function and create the UDF</span>
<span class="nd">@pandas_udf</span><span class="p">(</span><span class="s2">&quot;long&quot;</span><span class="p">)</span> <span class="c1"># type: ignore[call-overload]</span>
<span class="k">def</span> <span class="nf">plus_one</span><span class="p">(</span><span class="n">iterator</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">]:</span>
<span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">x</span> <span class="o">+</span> <span class="mi">1</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">plus_one</span><span class="p">(</span><span class="s2">&quot;x&quot;</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +-----------+</span>
<span class="c1"># |plus_one(x)|</span>
<span class="c1"># +-----------+</span>
<span class="c1"># | 2|</span>
<span class="c1"># | 3|</span>
<span class="c1"># | 4|</span>
<span class="c1"># +-----------+</span>
</pre></div>
</div>
<p>For detailed usage, please see <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html#pyspark.sql.functions.pandas_udf" title="pyspark.sql.functions.pandas_udf"><code class="xref py py-func docutils literal notranslate"><span class="pre">pandas_udf()</span></code></a>.</p>
</section>
<section id="iterator-of-multiple-series-to-iterator-of-series">
<h3>Iterator of Multiple Series to Iterator of Series<a class="headerlink" href="#iterator-of-multiple-series-to-iterator-of-series" title="Permalink to this headline">#</a></h3>
<p>The type hint can be expressed as <code class="docutils literal notranslate"><span class="pre">Iterator[Tuple[pandas.Series,</span> <span class="pre">...]]</span></code> -&gt; <code class="docutils literal notranslate"><span class="pre">Iterator[pandas.Series]</span></code>.</p>
<p>By using <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html#pyspark.sql.functions.pandas_udf" title="pyspark.sql.functions.pandas_udf"><code class="xref py py-func docutils literal notranslate"><span class="pre">pandas_udf()</span></code></a> with the function having such type hints above, it creates a Pandas UDF where the
given function takes an iterator of a tuple of multiple <code class="docutils literal notranslate"><span class="pre">pandas.Series</span></code> and outputs an iterator of <code class="docutils literal notranslate"><span class="pre">pandas.Series</span></code>.
In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple
when the Pandas UDF is called. Otherwise, it has the same characteristics and restrictions as the Iterator of Series
to Iterator of Series case.</p>
<p>The following example shows how to create this Pandas UDF:</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Iterator</span><span class="p">,</span> <span class="n">Tuple</span>
<span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">pandas_udf</span>
<span class="n">pdf</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">([</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">],</span> <span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s2">&quot;x&quot;</span><span class="p">])</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">pdf</span><span class="p">)</span>
<span class="c1"># Declare the function and create the UDF</span>
<span class="nd">@pandas_udf</span><span class="p">(</span><span class="s2">&quot;long&quot;</span><span class="p">)</span> <span class="c1"># type: ignore[call-overload]</span>
<span class="k">def</span> <span class="nf">multiply_two_cols</span><span class="p">(</span>
<span class="n">iterator</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">]])</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">]:</span>
<span class="k">for</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">a</span> <span class="o">*</span> <span class="n">b</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">multiply_two_cols</span><span class="p">(</span><span class="s2">&quot;x&quot;</span><span class="p">,</span> <span class="s2">&quot;x&quot;</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +-----------------------+</span>
<span class="c1"># |multiply_two_cols(x, x)|</span>
<span class="c1"># +-----------------------+</span>
<span class="c1"># | 1|</span>
<span class="c1"># | 4|</span>
<span class="c1"># | 9|</span>
<span class="c1"># +-----------------------+</span>
</pre></div>
</div>
<p>For detailed usage, please see <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html#pyspark.sql.functions.pandas_udf" title="pyspark.sql.functions.pandas_udf"><code class="xref py py-func docutils literal notranslate"><span class="pre">pandas_udf()</span></code></a>.</p>
</section>
<section id="series-to-scalar">
<h3>Series to Scalar<a class="headerlink" href="#series-to-scalar" title="Permalink to this headline">#</a></h3>
<p>The type hint can be expressed as <code class="docutils literal notranslate"><span class="pre">pandas.Series</span></code>, … -&gt; <code class="docutils literal notranslate"><span class="pre">Any</span></code>.</p>
<p>By using <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html#pyspark.sql.functions.pandas_udf" title="pyspark.sql.functions.pandas_udf"><code class="xref py py-func docutils literal notranslate"><span class="pre">pandas_udf()</span></code></a> with the function having such type hints above, it creates a Pandas UDF similar
to PySpark’s aggregate functions. The given function takes <cite>pandas.Series</cite> and returns a scalar value.
The return type should be a primitive data type, and the returned scalar can be either a python
primitive type, e.g., <code class="docutils literal notranslate"><span class="pre">int</span></code> or <code class="docutils literal notranslate"><span class="pre">float</span></code> or a numpy data type, e.g., <code class="docutils literal notranslate"><span class="pre">numpy.int64</span></code> or <code class="docutils literal notranslate"><span class="pre">numpy.float64</span></code>.
<code class="docutils literal notranslate"><span class="pre">Any</span></code> should ideally be a specific scalar type accordingly.</p>
<p>This UDF can be also used with <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.GroupedData.agg.html#pyspark.sql.GroupedData.agg" title="pyspark.sql.GroupedData.agg"><code class="xref py py-meth docutils literal notranslate"><span class="pre">GroupedData.agg()</span></code></a> and <cite>Window</cite>.
It defines an aggregation from one or more <code class="docutils literal notranslate"><span class="pre">pandas.Series</span></code> to a scalar value, where each <code class="docutils literal notranslate"><span class="pre">pandas.Series</span></code>
represents a column within the group or window.</p>
<p>Note that this type of UDF does not support partial aggregation and all data for a group or window
will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas
UDFs currently. The following example shows how to use this type of UDF to compute mean with a group-by
and window operations:</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">pandas_udf</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">Window</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span>
<span class="p">[(</span><span class="mi">1</span><span class="p">,</span> <span class="mf">1.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mf">2.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">3.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">5.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">10.0</span><span class="p">)],</span>
<span class="p">(</span><span class="s2">&quot;id&quot;</span><span class="p">,</span> <span class="s2">&quot;v&quot;</span><span class="p">))</span>
<span class="c1"># Declare the function and create the UDF</span>
<span class="nd">@pandas_udf</span><span class="p">(</span><span class="s2">&quot;double&quot;</span><span class="p">)</span> <span class="c1"># type: ignore[call-overload]</span>
<span class="k">def</span> <span class="nf">mean_udf</span><span class="p">(</span><span class="n">v</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">float</span><span class="p">:</span>
<span class="k">return</span> <span class="n">v</span><span class="o">.</span><span class="n">mean</span><span class="p">()</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">mean_udf</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s1">&#39;v&#39;</span><span class="p">]))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +-----------+</span>
<span class="c1"># |mean_udf(v)|</span>
<span class="c1"># +-----------+</span>
<span class="c1"># | 4.2|</span>
<span class="c1"># +-----------+</span>
<span class="n">df</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s2">&quot;id&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">agg</span><span class="p">(</span><span class="n">mean_udf</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s1">&#39;v&#39;</span><span class="p">]))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+-----------+</span>
<span class="c1"># | id|mean_udf(v)|</span>
<span class="c1"># +---+-----------+</span>
<span class="c1"># | 1| 1.5|</span>
<span class="c1"># | 2| 6.0|</span>
<span class="c1"># +---+-----------+</span>
<span class="n">w</span> <span class="o">=</span> <span class="n">Window</span> \
<span class="o">.</span><span class="n">partitionBy</span><span class="p">(</span><span class="s1">&#39;id&#39;</span><span class="p">)</span> \
<span class="o">.</span><span class="n">rowsBetween</span><span class="p">(</span><span class="n">Window</span><span class="o">.</span><span class="n">unboundedPreceding</span><span class="p">,</span> <span class="n">Window</span><span class="o">.</span><span class="n">unboundedFollowing</span><span class="p">)</span>
<span class="n">df</span><span class="o">.</span><span class="n">withColumn</span><span class="p">(</span><span class="s1">&#39;mean_v&#39;</span><span class="p">,</span> <span class="n">mean_udf</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s1">&#39;v&#39;</span><span class="p">])</span><span class="o">.</span><span class="n">over</span><span class="p">(</span><span class="n">w</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+----+------+</span>
<span class="c1"># | id| v|mean_v|</span>
<span class="c1"># +---+----+------+</span>
<span class="c1"># | 1| 1.0| 1.5|</span>
<span class="c1"># | 1| 2.0| 1.5|</span>
<span class="c1"># | 2| 3.0| 6.0|</span>
<span class="c1"># | 2| 5.0| 6.0|</span>
<span class="c1"># | 2|10.0| 6.0|</span>
<span class="c1"># +---+----+------+</span>
</pre></div>
</div>
<p>For detailed usage, please see <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html#pyspark.sql.functions.pandas_udf" title="pyspark.sql.functions.pandas_udf"><code class="xref py py-func docutils literal notranslate"><span class="pre">pandas_udf()</span></code></a>.</p>
</section>
</section>
<section id="pandas-function-apis">
<h2>Pandas Function APIs<a class="headerlink" href="#pandas-function-apis" title="Permalink to this headline">#</a></h2>
<p>Pandas Function APIs can directly apply a Python native function against the whole <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame" title="pyspark.sql.DataFrame"><code class="xref py py-class docutils literal notranslate"><span class="pre">DataFrame</span></code></a> by
using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer
data and Pandas to work with the data, which allows vectorized operations. However, a Pandas Function
API behaves as a regular API under PySpark <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame" title="pyspark.sql.DataFrame"><code class="xref py py-class docutils literal notranslate"><span class="pre">DataFrame</span></code></a> instead of <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.Column.html#pyspark.sql.Column" title="pyspark.sql.Column"><code class="xref py py-class docutils literal notranslate"><span class="pre">Column</span></code></a>, and Python type hints in Pandas
Functions APIs are optional and do not affect how it works internally at this moment although they
might be required in the future.</p>
<p>From Spark 3.0, grouped map pandas UDF is now categorized as a separate Pandas Function API,
<code class="docutils literal notranslate"><span class="pre">DataFrame.groupby().applyInPandas()</span></code>. It is still possible to use it with <code class="docutils literal notranslate"><span class="pre">pyspark.sql.functions.PandasUDFType</span></code>
and <code class="docutils literal notranslate"><span class="pre">DataFrame.groupby().apply()</span></code> as it was; however, it is preferred to use
<code class="docutils literal notranslate"><span class="pre">DataFrame.groupby().applyInPandas()</span></code> directly. Using <code class="docutils literal notranslate"><span class="pre">pyspark.sql.functions.PandasUDFType</span></code> will be deprecated
in the future.</p>
<section id="grouped-map">
<h3>Grouped Map<a class="headerlink" href="#grouped-map" title="Permalink to this headline">#</a></h3>
<p>Grouped map operations with Pandas instances are supported by <code class="docutils literal notranslate"><span class="pre">DataFrame.groupby().applyInPandas()</span></code>
which requires a Python function that takes a <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code> and return another <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code>.
It maps each group to each <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code> in the Python function.</p>
<p>This API implements the “split-apply-combine” pattern which consists of three steps:</p>
<ul class="simple">
<li><p>Split the data into groups by using <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.groupBy.html#pyspark.sql.DataFrame.groupBy" title="pyspark.sql.DataFrame.groupBy"><code class="xref py py-meth docutils literal notranslate"><span class="pre">DataFrame.groupBy()</span></code></a>.</p></li>
<li><p>Apply a function on each group. The input and output of the function are both <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code>. The input data contains all the rows and columns for each group.</p></li>
<li><p>Combine the results into a new PySpark <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame" title="pyspark.sql.DataFrame"><code class="xref py py-class docutils literal notranslate"><span class="pre">DataFrame</span></code></a>.</p></li>
</ul>
<p>To use <code class="docutils literal notranslate"><span class="pre">DataFrame.groupBy().applyInPandas()</span></code>, the user needs to define the following:</p>
<ul class="simple">
<li><p>A Python function that defines the computation for each group.</p></li>
<li><p>A <code class="docutils literal notranslate"><span class="pre">StructType</span></code> object or a string that defines the schema of the output PySpark <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame" title="pyspark.sql.DataFrame"><code class="xref py py-class docutils literal notranslate"><span class="pre">DataFrame</span></code></a>.</p></li>
</ul>
<p>The column labels of the returned <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code> must either match the field names in the
defined output schema if specified as strings, or match the field data types by position if not
strings, e.g. integer indices. See <a class="reference external" href="https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame">pandas.DataFrame</a>
on how to label columns when constructing a <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code>.</p>
<p>Note that all data for a group will be loaded into memory before the function is applied. This can
lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for
<a class="reference internal" href="#setting-arrow-batch-size"><span class="std std-ref">maxRecordsPerBatch</span></a> is not applied on groups and it is up to the user
to ensure that the grouped data will fit into the available memory.</p>
<p>The following example shows how to use <code class="docutils literal notranslate"><span class="pre">DataFrame.groupby().applyInPandas()</span></code> to subtract the mean from each value
in the group.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span>
<span class="p">[(</span><span class="mi">1</span><span class="p">,</span> <span class="mf">1.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mf">2.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">3.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">5.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">10.0</span><span class="p">)],</span>
<span class="p">(</span><span class="s2">&quot;id&quot;</span><span class="p">,</span> <span class="s2">&quot;v&quot;</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">subtract_mean</span><span class="p">(</span><span class="n">pdf</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">:</span>
<span class="c1"># pdf is a pandas.DataFrame</span>
<span class="n">v</span> <span class="o">=</span> <span class="n">pdf</span><span class="o">.</span><span class="n">v</span>
<span class="k">return</span> <span class="n">pdf</span><span class="o">.</span><span class="n">assign</span><span class="p">(</span><span class="n">v</span><span class="o">=</span><span class="n">v</span> <span class="o">-</span> <span class="n">v</span><span class="o">.</span><span class="n">mean</span><span class="p">())</span>
<span class="n">df</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s2">&quot;id&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">applyInPandas</span><span class="p">(</span><span class="n">subtract_mean</span><span class="p">,</span> <span class="n">schema</span><span class="o">=</span><span class="s2">&quot;id long, v double&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+----+</span>
<span class="c1"># | id| v|</span>
<span class="c1"># +---+----+</span>
<span class="c1"># | 1|-0.5|</span>
<span class="c1"># | 1| 0.5|</span>
<span class="c1"># | 2|-3.0|</span>
<span class="c1"># | 2|-1.0|</span>
<span class="c1"># | 2| 4.0|</span>
<span class="c1"># +---+----+</span>
</pre></div>
</div>
<p>For detailed usage, please see please see <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.GroupedData.applyInPandas.html#pyspark.sql.GroupedData.applyInPandas" title="pyspark.sql.GroupedData.applyInPandas"><code class="xref py py-meth docutils literal notranslate"><span class="pre">GroupedData.applyInPandas()</span></code></a></p>
</section>
<section id="map">
<h3>Map<a class="headerlink" href="#map" title="Permalink to this headline">#</a></h3>
<p>Map operations with Pandas instances are supported by <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.mapInPandas.html#pyspark.sql.DataFrame.mapInPandas" title="pyspark.sql.DataFrame.mapInPandas"><code class="xref py py-meth docutils literal notranslate"><span class="pre">DataFrame.mapInPandas()</span></code></a> which maps an iterator
of <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code>s to another iterator of <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code>s that represents the current
PySpark <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame" title="pyspark.sql.DataFrame"><code class="xref py py-class docutils literal notranslate"><span class="pre">DataFrame</span></code></a> and returns the result as a PySpark <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame" title="pyspark.sql.DataFrame"><code class="xref py py-class docutils literal notranslate"><span class="pre">DataFrame</span></code></a>. The function takes and outputs
an iterator of <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code>. It can return the output of arbitrary length in contrast to some
Pandas UDFs although internally it works similarly with Series to Series Pandas UDF.</p>
<p>The following example shows how to use <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.mapInPandas.html#pyspark.sql.DataFrame.mapInPandas" title="pyspark.sql.DataFrame.mapInPandas"><code class="xref py py-meth docutils literal notranslate"><span class="pre">DataFrame.mapInPandas()</span></code></a>:</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">([(</span><span class="mi">1</span><span class="p">,</span> <span class="mi">21</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mi">30</span><span class="p">)],</span> <span class="p">(</span><span class="s2">&quot;id&quot;</span><span class="p">,</span> <span class="s2">&quot;age&quot;</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">filter_func</span><span class="p">(</span><span class="n">iterator</span><span class="p">:</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">]:</span>
<span class="k">for</span> <span class="n">pdf</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">pdf</span><span class="p">[</span><span class="n">pdf</span><span class="o">.</span><span class="n">id</span> <span class="o">==</span> <span class="mi">1</span><span class="p">]</span>
<span class="n">df</span><span class="o">.</span><span class="n">mapInPandas</span><span class="p">(</span><span class="n">filter_func</span><span class="p">,</span> <span class="n">schema</span><span class="o">=</span><span class="n">df</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+---+</span>
<span class="c1"># | id|age|</span>
<span class="c1"># +---+---+</span>
<span class="c1"># | 1| 21|</span>
<span class="c1"># +---+---+</span>
</pre></div>
</div>
<p>For detailed usage, please see <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.mapInPandas.html#pyspark.sql.DataFrame.mapInPandas" title="pyspark.sql.DataFrame.mapInPandas"><code class="xref py py-meth docutils literal notranslate"><span class="pre">DataFrame.mapInPandas()</span></code></a>.</p>
</section>
<section id="co-grouped-map">
<h3>Co-grouped Map<a class="headerlink" href="#co-grouped-map" title="Permalink to this headline">#</a></h3>
<p>Co-grouped map operations with Pandas instances are supported by <code class="docutils literal notranslate"><span class="pre">DataFrame.groupby().cogroup().applyInPandas()</span></code> which
allows two PySpark <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame" title="pyspark.sql.DataFrame"><code class="xref py py-class docutils literal notranslate"><span class="pre">DataFrame</span></code></a>s to be cogrouped by a common key and then a Python function applied to each
cogroup. It consists of the following steps:</p>
<ul class="simple">
<li><p>Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.</p></li>
<li><p>Apply a function to each cogroup. The input of the function is two <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code> (with an optional tuple representing the key). The output of the function is a <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code>.</p></li>
<li><p>Combine the <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code>s from all groups into a new PySpark <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame" title="pyspark.sql.DataFrame"><code class="xref py py-class docutils literal notranslate"><span class="pre">DataFrame</span></code></a>.</p></li>
</ul>
<p>To use <code class="docutils literal notranslate"><span class="pre">groupBy().cogroup().applyInPandas()</span></code>, the user needs to define the following:</p>
<ul class="simple">
<li><p>A Python function that defines the computation for each cogroup.</p></li>
<li><p>A <code class="docutils literal notranslate"><span class="pre">StructType</span></code> object or a string that defines the schema of the output PySpark <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame" title="pyspark.sql.DataFrame"><code class="xref py py-class docutils literal notranslate"><span class="pre">DataFrame</span></code></a>.</p></li>
</ul>
<p>The column labels of the returned <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code> must either match the field names in the
defined output schema if specified as strings, or match the field data types by position if not
strings, e.g. integer indices. See <a class="reference external" href="https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame">pandas.DataFrame</a>.
on how to label columns when constructing a <code class="docutils literal notranslate"><span class="pre">pandas.DataFrame</span></code>.</p>
<p>Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of
memory exceptions, especially if the group sizes are skewed. The configuration for <a class="reference internal" href="#setting-arrow-batch-size"><span class="std std-ref">maxRecordsPerBatch</span></a>
is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.</p>
<p>The following example shows how to use <code class="docutils literal notranslate"><span class="pre">DataFrame.groupby().cogroup().applyInPandas()</span></code> to perform an asof join between two datasets.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span>
<span class="n">df1</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span>
<span class="p">[(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mf">1.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mf">2.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000102</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mf">3.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000102</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mf">4.0</span><span class="p">)],</span>
<span class="p">(</span><span class="s2">&quot;time&quot;</span><span class="p">,</span> <span class="s2">&quot;id&quot;</span><span class="p">,</span> <span class="s2">&quot;v1&quot;</span><span class="p">))</span>
<span class="n">df2</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span>
<span class="p">[(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="s2">&quot;x&quot;</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="s2">&quot;y&quot;</span><span class="p">)],</span>
<span class="p">(</span><span class="s2">&quot;time&quot;</span><span class="p">,</span> <span class="s2">&quot;id&quot;</span><span class="p">,</span> <span class="s2">&quot;v2&quot;</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">merge_ordered</span><span class="p">(</span><span class="n">left</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">,</span> <span class="n">right</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">:</span>
<span class="k">return</span> <span class="n">pd</span><span class="o">.</span><span class="n">merge_ordered</span><span class="p">(</span><span class="n">left</span><span class="p">,</span> <span class="n">right</span><span class="p">)</span>
<span class="n">df1</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s2">&quot;id&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">cogroup</span><span class="p">(</span><span class="n">df2</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s2">&quot;id&quot;</span><span class="p">))</span><span class="o">.</span><span class="n">applyInPandas</span><span class="p">(</span>
<span class="n">merge_ordered</span><span class="p">,</span> <span class="n">schema</span><span class="o">=</span><span class="s2">&quot;time int, id int, v1 double, v2 string&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +--------+---+---+----+</span>
<span class="c1"># | time| id| v1| v2|</span>
<span class="c1"># +--------+---+---+----+</span>
<span class="c1"># |20000101| 1|1.0| x|</span>
<span class="c1"># |20000102| 1|3.0|null|</span>
<span class="c1"># |20000101| 2|2.0| y|</span>
<span class="c1"># |20000102| 2|4.0|null|</span>
<span class="c1"># +--------+---+---+----+</span>
</pre></div>
</div>
<p>For detailed usage, please see <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.PandasCogroupedOps.applyInPandas.html#pyspark.sql.PandasCogroupedOps.applyInPandas" title="pyspark.sql.PandasCogroupedOps.applyInPandas"><code class="xref py py-meth docutils literal notranslate"><span class="pre">PandasCogroupedOps.applyInPandas()</span></code></a></p>
</section>
</section>
<section id="arrow-python-udfs">
<h2>Arrow Python UDFs<a class="headerlink" href="#arrow-python-udfs" title="Permalink to this headline">#</a></h2>
<p>Arrow Python UDFs are user defined functions that are executed row-by-row, utilizing Arrow for efficient batch data
transfer and serialization. To define an Arrow Python UDF, you can use the <code class="xref py py-meth docutils literal notranslate"><span class="pre">udf()</span></code> decorator or wrap the function
with the <code class="xref py py-meth docutils literal notranslate"><span class="pre">udf()</span></code> method, ensuring the <code class="docutils literal notranslate"><span class="pre">useArrow</span></code> parameter is set to True. Additionally, you can enable Arrow
optimization for Python UDFs throughout the entire SparkSession by setting the Spark configuration
<code class="docutils literal notranslate"><span class="pre">spark.sql.execution.pythonUDF.arrow.enabled</span></code> to true. It’s important to note that the Spark configuration takes
effect only when <code class="docutils literal notranslate"><span class="pre">useArrow</span></code> is either not set or set to None.</p>
<p>The type hints for Arrow Python UDFs should be specified in the same way as for default, pickled Python UDFs.</p>
<p>Here’s an example that demonstrates the usage of both a default, pickled Python UDF and an Arrow Python UDF:</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">udf</span>
<span class="nd">@udf</span><span class="p">(</span><span class="n">returnType</span><span class="o">=</span><span class="s1">&#39;int&#39;</span><span class="p">)</span> <span class="c1"># A default, pickled Python UDF</span>
<span class="k">def</span> <span class="nf">slen</span><span class="p">(</span><span class="n">s</span><span class="p">):</span> <span class="c1"># type: ignore[no-untyped-def]</span>
<span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="n">s</span><span class="p">)</span>
<span class="nd">@udf</span><span class="p">(</span><span class="n">returnType</span><span class="o">=</span><span class="s1">&#39;int&#39;</span><span class="p">,</span> <span class="n">useArrow</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> <span class="c1"># An Arrow Python UDF</span>
<span class="k">def</span> <span class="nf">arrow_slen</span><span class="p">(</span><span class="n">s</span><span class="p">):</span> <span class="c1"># type: ignore[no-untyped-def]</span>
<span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="n">s</span><span class="p">)</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">([(</span><span class="mi">1</span><span class="p">,</span> <span class="s2">&quot;John Doe&quot;</span><span class="p">,</span> <span class="mi">21</span><span class="p">)],</span> <span class="p">(</span><span class="s2">&quot;id&quot;</span><span class="p">,</span> <span class="s2">&quot;name&quot;</span><span class="p">,</span> <span class="s2">&quot;age&quot;</span><span class="p">))</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">slen</span><span class="p">(</span><span class="s2">&quot;name&quot;</span><span class="p">),</span> <span class="n">arrow_slen</span><span class="p">(</span><span class="s2">&quot;name&quot;</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +----------+----------------+</span>
<span class="c1"># |slen(name)|arrow_slen(name)|</span>
<span class="c1"># +----------+----------------+</span>
<span class="c1"># | 8| 8|</span>
<span class="c1"># +----------+----------------+</span>
</pre></div>
</div>
<p>Compared to the default, pickled Python UDFs, Arrow Python UDFs provide a more coherent type coercion mechanism. UDF
type coercion poses challenges when the Python instances returned by UDFs do not align with the user-specified
return type. The default, pickled Python UDFs’ type coercion has certain limitations, such as relying on None as a
fallback for type mismatches, leading to potential ambiguity and data loss. Additionally, converting date, datetime,
and tuples to strings can yield ambiguous results. Arrow Python UDFs, on the other hand, leverage Arrow’s
capabilities to standardize type coercion and address these issues effectively.</p>
</section>
<section id="usage-notes">
<h2>Usage Notes<a class="headerlink" href="#usage-notes" title="Permalink to this headline">#</a></h2>
<section id="supported-sql-types">
<h3>Supported SQL Types<a class="headerlink" href="#supported-sql-types" title="Permalink to this headline">#</a></h3>
<p>Currently, all Spark SQL data types are supported by Arrow-based conversion except
<a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.types.ArrayType.html#pyspark.sql.types.ArrayType" title="pyspark.sql.types.ArrayType"><code class="xref py py-class docutils literal notranslate"><span class="pre">ArrayType</span></code></a> of <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.types.TimestampType.html#pyspark.sql.types.TimestampType" title="pyspark.sql.types.TimestampType"><code class="xref py py-class docutils literal notranslate"><span class="pre">TimestampType</span></code></a>.
<a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.types.MapType.html#pyspark.sql.types.MapType" title="pyspark.sql.types.MapType"><code class="xref py py-class docutils literal notranslate"><span class="pre">MapType</span></code></a> and <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.types.ArrayType.html#pyspark.sql.types.ArrayType" title="pyspark.sql.types.ArrayType"><code class="xref py py-class docutils literal notranslate"><span class="pre">ArrayType</span></code></a> of nested <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.types.StructType.html#pyspark.sql.types.StructType" title="pyspark.sql.types.StructType"><code class="xref py py-class docutils literal notranslate"><span class="pre">StructType</span></code></a> are only supported
when using PyArrow 2.0.0 and above.</p>
</section>
<section id="setting-arrow-batch-size">
<h3>Setting Arrow Batch Size<a class="headerlink" href="#setting-arrow-batch-size" title="Permalink to this headline">#</a></h3>
<p>Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to
high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow
record batches can be adjusted by setting the conf <code class="docutils literal notranslate"><span class="pre">spark.sql.execution.arrow.maxRecordsPerBatch</span></code>
to an integer that will determine the maximum number of rows for each batch. The default value is
10,000 records per batch. If the number of columns is large, the value should be adjusted
accordingly. Using this limit, each data partition will be made into 1 or more record batches for
processing.</p>
</section>
<section id="timestamp-with-time-zone-semantics">
<h3>Timestamp with Time Zone Semantics<a class="headerlink" href="#timestamp-with-time-zone-semantics" title="Permalink to this headline">#</a></h3>
<p>Spark internally stores timestamps as UTC values, and timestamp data that is brought in without
a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp
data is exported or displayed in Spark, the session time zone is used to localize the timestamp
values. The session time zone is set with the configuration <code class="docutils literal notranslate"><span class="pre">spark.sql.session.timeZone</span></code> and will
default to the JVM system local time zone if not set. Pandas uses a <code class="docutils literal notranslate"><span class="pre">datetime64</span></code> type with nanosecond
resolution, <code class="docutils literal notranslate"><span class="pre">datetime64[ns]</span></code>, with optional time zone on a per-column basis.</p>
<p>When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds
and each column will be converted to the Spark session time zone then localized to that time
zone, which removes the time zone and displays values as local time. This will occur
when calling <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.DataFrame.toPandas.html#pyspark.sql.DataFrame.toPandas" title="pyspark.sql.DataFrame.toPandas"><code class="xref py py-meth docutils literal notranslate"><span class="pre">DataFrame.toPandas()</span></code></a> or <code class="docutils literal notranslate"><span class="pre">pandas_udf</span></code> with timestamp columns.</p>
<p>When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds. This
occurs when calling <a class="reference internal" href="../../reference/pyspark.sql/api/pyspark.sql.SparkSession.createDataFrame.html#pyspark.sql.SparkSession.createDataFrame" title="pyspark.sql.SparkSession.createDataFrame"><code class="xref py py-meth docutils literal notranslate"><span class="pre">SparkSession.createDataFrame()</span></code></a> with a Pandas DataFrame or when returning a timestamp from a
<code class="docutils literal notranslate"><span class="pre">pandas_udf</span></code>. These conversions are done automatically to ensure Spark will have data in the
expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond
values will be truncated.</p>
<p>Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is
different from a Pandas timestamp. It is recommended to use Pandas time series functionality when
working with timestamps in <code class="docutils literal notranslate"><span class="pre">pandas_udf</span></code>s to get the best performance, see
<a class="reference external" href="https://pandas.pydata.org/pandas-docs/stable/timeseries.html">here</a> for details.</p>
</section>
<section id="recommended-pandas-and-pyarrow-versions">
<h3>Recommended Pandas and PyArrow Versions<a class="headerlink" href="#recommended-pandas-and-pyarrow-versions" title="Permalink to this headline">#</a></h3>
<p>For usage with pyspark.sql, the minimum supported versions of Pandas is 2.0.0 and PyArrow is 10.0.0.
Higher versions may be used, however, compatibility and data correctness can not be guaranteed and should
be verified by the user.</p>
</section>
<section id="setting-arrow-self-destruct-for-memory-savings">
<h3>Setting Arrow <code class="docutils literal notranslate"><span class="pre">self_destruct</span></code> for memory savings<a class="headerlink" href="#setting-arrow-self-destruct-for-memory-savings" title="Permalink to this headline">#</a></h3>
<p>Since Spark 3.2, the Spark configuration <code class="docutils literal notranslate"><span class="pre">spark.sql.execution.arrow.pyspark.selfDestruct.enabled</span></code>
can be used to enable PyArrow’s <code class="docutils literal notranslate"><span class="pre">self_destruct</span></code> feature, which can save memory when creating a
Pandas DataFrame via <code class="docutils literal notranslate"><span class="pre">toPandas</span></code> by freeing Arrow-allocated memory while building the Pandas
DataFrame. This option can also save memory when creating a PyArrow Table via <code class="docutils literal notranslate"><span class="pre">toArrow</span></code>.
This option is experimental. When used with <code class="docutils literal notranslate"><span class="pre">toPandas</span></code>, some operations may fail on the resulting
Pandas DataFrame due to immutable backing arrays. Typically, you would see the error
<code class="docutils literal notranslate"><span class="pre">ValueError:</span> <span class="pre">buffer</span> <span class="pre">source</span> <span class="pre">array</span> <span class="pre">is</span> <span class="pre">read-only</span></code>. Newer versions of Pandas may fix these errors by
improving support for such cases. You can work around this error by copying the column(s)
beforehand. Additionally, this conversion may be slower because it is single-threaded.</p>
</section>
</section>
</section>
</article>
<footer class="bd-footer-article">
<div class="footer-article-items footer-article__inner">
<div class="footer-article-item"><!-- Previous / next buttons -->
<div class="prev-next-area">
<a class="left-prev"
href="index.html"
title="previous page">
<i class="fa-solid fa-angle-left"></i>
<div class="prev-next-info">
<p class="prev-next-subtitle">previous</p>
<p class="prev-next-title">Spark SQL</p>
</div>
</a>
<a class="right-next"
href="python_udtf.html"
title="next page">
<div class="prev-next-info">
<p class="prev-next-subtitle">next</p>
<p class="prev-next-title">Python User-defined Table Functions (UDTFs)</p>
</div>
<i class="fa-solid fa-angle-right"></i>
</a>
</div></div>
</div>
</footer>
</div>
<div class="bd-sidebar-secondary bd-toc"><div class="sidebar-secondary-items sidebar-secondary__inner">
<div class="sidebar-secondary-item">
<div class="page-toc tocsection onthispage">
<i class="fa-solid fa-list"></i> On this page
</div>
<nav class="bd-toc-nav page-toc">
<ul class="visible nav section-nav flex-column">
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#ensure-pyarrow-installed">Ensure PyArrow Installed</a></li>
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#conversion-to-arrow-table">Conversion to Arrow Table</a></li>
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#enabling-for-conversion-to-from-pandas">Enabling for Conversion to/from Pandas</a></li>
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#pandas-udfs-a-k-a-vectorized-udfs">Pandas UDFs (a.k.a. Vectorized UDFs)</a><ul class="nav section-nav flex-column">
<li class="toc-h3 nav-item toc-entry"><a class="reference internal nav-link" href="#series-to-series">Series to Series</a></li>
<li class="toc-h3 nav-item toc-entry"><a class="reference internal nav-link" href="#iterator-of-series-to-iterator-of-series">Iterator of Series to Iterator of Series</a></li>
<li class="toc-h3 nav-item toc-entry"><a class="reference internal nav-link" href="#iterator-of-multiple-series-to-iterator-of-series">Iterator of Multiple Series to Iterator of Series</a></li>
<li class="toc-h3 nav-item toc-entry"><a class="reference internal nav-link" href="#series-to-scalar">Series to Scalar</a></li>
</ul>
</li>
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#pandas-function-apis">Pandas Function APIs</a><ul class="nav section-nav flex-column">
<li class="toc-h3 nav-item toc-entry"><a class="reference internal nav-link" href="#grouped-map">Grouped Map</a></li>
<li class="toc-h3 nav-item toc-entry"><a class="reference internal nav-link" href="#map">Map</a></li>
<li class="toc-h3 nav-item toc-entry"><a class="reference internal nav-link" href="#co-grouped-map">Co-grouped Map</a></li>
</ul>
</li>
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#arrow-python-udfs">Arrow Python UDFs</a></li>
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#usage-notes">Usage Notes</a><ul class="nav section-nav flex-column">
<li class="toc-h3 nav-item toc-entry"><a class="reference internal nav-link" href="#supported-sql-types">Supported SQL Types</a></li>
<li class="toc-h3 nav-item toc-entry"><a class="reference internal nav-link" href="#setting-arrow-batch-size">Setting Arrow Batch Size</a></li>
<li class="toc-h3 nav-item toc-entry"><a class="reference internal nav-link" href="#timestamp-with-time-zone-semantics">Timestamp with Time Zone Semantics</a></li>
<li class="toc-h3 nav-item toc-entry"><a class="reference internal nav-link" href="#recommended-pandas-and-pyarrow-versions">Recommended Pandas and PyArrow Versions</a></li>
<li class="toc-h3 nav-item toc-entry"><a class="reference internal nav-link" href="#setting-arrow-self-destruct-for-memory-savings">Setting Arrow <code class="docutils literal notranslate"><span class="pre">self_destruct</span></code> for memory savings</a></li>
</ul>
</li>
</ul>
</nav></div>
<div class="sidebar-secondary-item">
<div class="tocsection sourcelink">
<a href="../../_sources/user_guide/sql/arrow_pandas.rst.txt">
<i class="fa-solid fa-file-lines"></i> Show Source
</a>
</div>
</div>
</div></div>
</div>
<footer class="bd-footer-content">
</footer>
</main>
</div>
</div>
<!-- Scripts loaded after <body> so the DOM is not blocked -->
<script src="../../_static/scripts/bootstrap.js?digest=e353d410970836974a52"></script>
<script src="../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52"></script>
<footer class="bd-footer">
<div class="bd-footer__inner bd-page-width">
<div class="footer-items__start">
<div class="footer-item"><p class="copyright">
Copyright @ 2024 The Apache Software Foundation, Licensed under the <a href="https://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
</p></div>
<div class="footer-item">
<p class="sphinx-version">
Created using <a href="https://www.sphinx-doc.org/">Sphinx</a> 4.5.0.
<br/>
</p>
</div>
</div>
<div class="footer-items__end">
<div class="footer-item"><p class="theme-version">
Built with the <a href="https://pydata-sphinx-theme.readthedocs.io/en/stable/index.html">PyData Sphinx Theme</a> 0.13.3.
</p></div>
</div>
</div>
</footer>
</body>
</html>