blob: e70f3429e3570e62f933fb2068cb525e7821aa30 [file] [log] [blame]
<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<title>PySpark Usage Guide for Pandas with Apache Arrow - Spark 3.0.0 Documentation</title>
<link rel="stylesheet" href="css/bootstrap.min.css">
<style>
body {
padding-top: 60px;
padding-bottom: 40px;
}
</style>
<meta name="viewport" content="width=device-width">
<link rel="stylesheet" href="css/bootstrap-responsive.min.css">
<link rel="stylesheet" href="css/main.css">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
<link rel="stylesheet" href="css/pygments-default.css">
<!-- Google analytics script -->
<script type="text/javascript">
var _gaq = _gaq || [];
_gaq.push(['_setAccount', 'UA-32518208-2']);
_gaq.push(['_trackPageview']);
(function() {
var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
})();
</script>
</head>
<body>
<!--[if lt IE 7]>
<p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
<![endif]-->
<!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
<div class="navbar navbar-fixed-top" id="topbar">
<div class="navbar-inner">
<div class="container">
<div class="brand"><a href="index.html">
<img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">3.0.0</span>
</div>
<ul class="nav">
<!--TODO(andyk): Add class="active" attribute to li some how.-->
<li><a href="index.html">Overview</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="quick-start.html">Quick Start</a></li>
<li><a href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a></li>
<li><a href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a></li>
<li><a href="structured-streaming-programming-guide.html">Structured Streaming</a></li>
<li><a href="streaming-programming-guide.html">Spark Streaming (DStreams)</a></li>
<li><a href="ml-guide.html">MLlib (Machine Learning)</a></li>
<li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
<li><a href="sparkr.html">SparkR (R on Spark)</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="api/scala/org/apache/spark/index.html">Scala</a></li>
<li><a href="api/java/index.html">Java</a></li>
<li><a href="api/python/index.html">Python</a></li>
<li><a href="api/R/index.html">R</a></li>
<li><a href="api/sql/index.html">SQL, Built-in Functions</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="cluster-overview.html">Overview</a></li>
<li><a href="submitting-applications.html">Submitting Applications</a></li>
<li class="divider"></li>
<li><a href="spark-standalone.html">Spark Standalone</a></li>
<li><a href="running-on-mesos.html">Mesos</a></li>
<li><a href="running-on-yarn.html">YARN</a></li>
<li><a href="running-on-kubernetes.html">Kubernetes</a></li>
</ul>
</li>
<li class="dropdown">
<a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="configuration.html">Configuration</a></li>
<li><a href="monitoring.html">Monitoring</a></li>
<li><a href="tuning.html">Tuning Guide</a></li>
<li><a href="job-scheduling.html">Job Scheduling</a></li>
<li><a href="security.html">Security</a></li>
<li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
<li><a href="migration-guide.html">Migration Guide</a></li>
<li class="divider"></li>
<li><a href="building-spark.html">Building Spark</a></li>
<li><a href="https://spark.apache.org/contributing.html">Contributing to Spark</a></li>
<li><a href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a></li>
</ul>
</li>
</ul>
<!--<p class="navbar-text pull-right"><span class="version-text">v3.0.0</span></p>-->
</div>
</div>
</div>
<div class="container-wrapper">
<div class="left-menu-wrapper">
<div class="left-menu">
<h3><a href="sql-programming-guide.html">Spark SQL Guide</a></h3>
<ul>
<li>
<a href="sql-getting-started.html">
Getting Started
</a>
</li>
<li>
<a href="sql-data-sources.html">
Data Sources
</a>
</li>
<li>
<a href="sql-performance-tuning.html">
Performance Tuning
</a>
</li>
<li>
<a href="sql-distributed-sql-engine.html">
Distributed SQL Engine
</a>
</li>
<li>
<a href="sql-pyspark-pandas-with-arrow.html">
<b>PySpark Usage Guide for Pandas with Apache Arrow</b>
</a>
</li>
<ul>
<li>
<a href="sql-pyspark-pandas-with-arrow.html#apache-arrow-in-spark">
Apache Arrow in Spark
</a>
</li>
<li>
<a href="sql-pyspark-pandas-with-arrow.html#enabling-for-conversion-tofrom-pandas">
Enabling for Conversion to/from Pandas
</a>
</li>
<li>
<a href="sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs">
Pandas UDFs (a.k.a. Vectorized UDFs)
</a>
</li>
<li>
<a href="sql-pyspark-pandas-with-arrow.html#pandas-function-apis">
Pandas Function APIs
</a>
</li>
<li>
<a href="sql-pyspark-pandas-with-arrow.html#usage-notes">
Usage Notes
</a>
</li>
</ul>
<li>
<a href="sql-migration-old.html">
Migration Guide
</a>
</li>
<li>
<a href="sql-ref.html">
SQL Reference
</a>
</li>
</ul>
</div>
</div>
<input id="nav-trigger" class="nav-trigger" checked type="checkbox">
<label for="nav-trigger"></label>
<div class="content-with-sidebar" id="content">
<h1 class="title">PySpark Usage Guide for Pandas with Apache Arrow</h1>
<ul id="markdown-toc">
<li><a href="#apache-arrow-in-pyspark" id="markdown-toc-apache-arrow-in-pyspark">Apache Arrow in PySpark</a> <ul>
<li><a href="#ensure-pyarrow-installed" id="markdown-toc-ensure-pyarrow-installed">Ensure PyArrow Installed</a></li>
</ul>
</li>
<li><a href="#enabling-for-conversion-tofrom-pandas" id="markdown-toc-enabling-for-conversion-tofrom-pandas">Enabling for Conversion to/from Pandas</a></li>
<li><a href="#pandas-udfs-aka-vectorized-udfs" id="markdown-toc-pandas-udfs-aka-vectorized-udfs">Pandas UDFs (a.k.a. Vectorized UDFs)</a> <ul>
<li><a href="#series-to-series" id="markdown-toc-series-to-series">Series to Series</a></li>
<li><a href="#iterator-of-series-to-iterator-of-series" id="markdown-toc-iterator-of-series-to-iterator-of-series">Iterator of Series to Iterator of Series</a></li>
<li><a href="#iterator-of-multiple-series-to-iterator-of-series" id="markdown-toc-iterator-of-multiple-series-to-iterator-of-series">Iterator of Multiple Series to Iterator of Series</a></li>
<li><a href="#series-to-scalar" id="markdown-toc-series-to-scalar">Series to Scalar</a></li>
</ul>
</li>
<li><a href="#pandas-function-apis" id="markdown-toc-pandas-function-apis">Pandas Function APIs</a> <ul>
<li><a href="#grouped-map" id="markdown-toc-grouped-map">Grouped Map</a></li>
<li><a href="#map" id="markdown-toc-map">Map</a></li>
<li><a href="#co-grouped-map" id="markdown-toc-co-grouped-map">Co-grouped Map</a></li>
</ul>
</li>
<li><a href="#usage-notes" id="markdown-toc-usage-notes">Usage Notes</a> <ul>
<li><a href="#supported-sql-types" id="markdown-toc-supported-sql-types">Supported SQL Types</a></li>
<li><a href="#setting-arrow-batch-size" id="markdown-toc-setting-arrow-batch-size">Setting Arrow Batch Size</a></li>
<li><a href="#timestamp-with-time-zone-semantics" id="markdown-toc-timestamp-with-time-zone-semantics">Timestamp with Time Zone Semantics</a></li>
<li><a href="#recommended-pandas-and-pyarrow-versions" id="markdown-toc-recommended-pandas-and-pyarrow-versions">Recommended Pandas and PyArrow Versions</a></li>
<li><a href="#compatibility-setting-for-pyarrow--0150-and-spark-23x-24x" id="markdown-toc-compatibility-setting-for-pyarrow--0150-and-spark-23x-24x">Compatibility Setting for PyArrow &gt;= 0.15.0 and Spark 2.3.x, 2.4.x</a></li>
</ul>
</li>
</ul>
<h2 id="apache-arrow-in-pyspark">Apache Arrow in PySpark</h2>
<p>Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer
data between JVM and Python processes. This currently is most beneficial to Python users that
work with Pandas/NumPy data. Its usage is not automatic and might require some minor
changes to configuration or code to take full advantage and ensure compatibility. This guide will
give a high-level description of how to use Arrow in Spark and highlight any differences when
working with Arrow-enabled data.</p>
<h3 id="ensure-pyarrow-installed">Ensure PyArrow Installed</h3>
<p>To use Apache Arrow in PySpark, <a href="#recommended-pandas-and-pyarrow-versions">the recommended version of PyArrow</a>
should be installed.
If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the
SQL module with the command <code class="highlighter-rouge">pip install pyspark[sql]</code>. Otherwise, you must ensure that PyArrow
is installed and available on all cluster nodes.
You can install using pip or conda from the conda-forge channel. See PyArrow
<a href="https://arrow.apache.org/docs/python/install.html">installation</a> for details.</p>
<h2 id="enabling-for-conversion-tofrom-pandas">Enabling for Conversion to/from Pandas</h2>
<p>Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame
using the call <code class="highlighter-rouge">toPandas()</code> and when creating a Spark DataFrame from a Pandas DataFrame with
<code class="highlighter-rouge">createDataFrame(pandas_df)</code>. To use Arrow when executing these calls, users need to first set
the Spark configuration <code class="highlighter-rouge">spark.sql.execution.arrow.pyspark.enabled</code> to <code class="highlighter-rouge">true</code>. This is disabled by default.</p>
<p>In addition, optimizations enabled by <code class="highlighter-rouge">spark.sql.execution.arrow.pyspark.enabled</code> could fallback automatically
to non-Arrow optimization implementation if an error occurs before the actual computation within Spark.
This can be controlled by <code class="highlighter-rouge">spark.sql.execution.arrow.pyspark.fallback.enabled</code>.</p>
<div class="codetabs">
<div data-lang="python">
<div class="highlight"><pre class="codehilite"><code><span class="kn">import</span> <span class="nn">numpy</span> <span class="k">as</span> <span class="n">np</span>
<span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span>
<span class="c1"># Enable Arrow-based columnar data transfers
</span><span class="n">spark</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="nb">set</span><span class="p">(</span><span class="s">"spark.sql.execution.arrow.pyspark.enabled"</span><span class="p">,</span> <span class="s">"true"</span><span class="p">)</span>
<span class="c1"># Generate a Pandas DataFrame
</span><span class="n">pdf</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">(</span><span class="n">np</span><span class="o">.</span><span class="n">random</span><span class="o">.</span><span class="n">rand</span><span class="p">(</span><span class="mi">100</span><span class="p">,</span> <span class="mi">3</span><span class="p">))</span>
<span class="c1"># Create a Spark DataFrame from a Pandas DataFrame using Arrow
</span><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">pdf</span><span class="p">)</span>
<span class="c1"># Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
</span><span class="n">result_pdf</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s">"*"</span><span class="p">)</span><span class="o">.</span><span class="n">toPandas</span><span class="p">()</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div>
</div>
</div>
<p>Using the above optimizations with Arrow will produce the same results as when Arrow is not
enabled. Note that even with Arrow, <code class="highlighter-rouge">toPandas()</code> results in the collection of all records in the
DataFrame to the driver program and should be done on a small subset of the data. Not all Spark
data types are currently supported and an error can be raised if a column has an unsupported type,
see <a href="#supported-sql-types">Supported SQL Types</a>. If an error occurs during <code class="highlighter-rouge">createDataFrame()</code>,
Spark will fall back to create the DataFrame without Arrow.</p>
<h2 id="pandas-udfs-aka-vectorized-udfs">Pandas UDFs (a.k.a. Vectorized UDFs)</h2>
<p>Pandas UDFs are user defined functions that are executed by Spark using
Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas
UDF is defined using the <code class="highlighter-rouge">pandas_udf</code> as a decorator or to wrap the function, and no additional
configuration is required. A Pandas UDF behaves as a regular PySpark function API in general.</p>
<p>Before Spark 3.0, Pandas UDFs used to be defined with <code class="highlighter-rouge">PandasUDFType</code>. From Spark 3.0
with Python 3.6+, you can also use <a href="https://www.python.org/dev/peps/pep-0484">Python type hints</a>.
Using Python type hints are preferred and using <code class="highlighter-rouge">PandasUDFType</code> will be deprecated in
the future release.</p>
<p>Note that the type hint should use <code class="highlighter-rouge">pandas.Series</code> in all cases but there is one variant
that <code class="highlighter-rouge">pandas.DataFrame</code> should be used for its input or output type hint instead when the input
or output column is of <code class="highlighter-rouge">StructType</code>. The following example shows a Pandas UDF which takes long
column, string column and struct column, and outputs a struct column. It requires the function to
specify the type hints of <code class="highlighter-rouge">pandas.Series</code> and <code class="highlighter-rouge">pandas.DataFrame</code> as below:</p>
<p>
<div class="codetabs">
<div data-lang="python">
<div class="highlight"><pre class="codehilite"><code><span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">pandas_udf</span>
<span class="o">@</span><span class="n">pandas_udf</span><span class="p">(</span><span class="s">"col1 string, col2 long"</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">func</span><span class="p">(</span><span class="n">s1</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">,</span> <span class="n">s2</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">,</span> <span class="n">s3</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">:</span>
<span class="n">s3</span><span class="p">[</span><span class="s">'col2'</span><span class="p">]</span> <span class="o">=</span> <span class="n">s1</span> <span class="o">+</span> <span class="n">s2</span><span class="o">.</span><span class="nb">str</span><span class="o">.</span><span class="nb">len</span><span class="p">()</span>
<span class="k">return</span> <span class="n">s3</span>
<span class="c1"># Create a Spark DataFrame that has three columns including a sturct column.
</span><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span>
<span class="p">[[</span><span class="mi">1</span><span class="p">,</span> <span class="s">"a string"</span><span class="p">,</span> <span class="p">(</span><span class="s">"a nested string"</span><span class="p">,)]],</span>
<span class="s">"long_col long, string_col string, struct_col struct&lt;col1:string&gt;"</span><span class="p">)</span>
<span class="n">df</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span>
<span class="c1"># root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# | |-- col1: string (nullable = true)
</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">func</span><span class="p">(</span><span class="s">"long_col"</span><span class="p">,</span> <span class="s">"string_col"</span><span class="p">,</span> <span class="s">"struct_col"</span><span class="p">))</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span>
<span class="c1"># |-- func(long_col, string_col, struct_col): struct (nullable = true)
# | |-- col1: string (nullable = true)
# | |-- col2: long (nullable = true)</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div>
</div>
</div>
</p>
<p>In the following sections, it describes the combinations of the supported type hints. For simplicity,
<code class="highlighter-rouge">pandas.DataFrame</code> variant is omitted.</p>
<h3 id="series-to-series">Series to Series</h3>
<p>The type hint can be expressed as <code class="highlighter-rouge">pandas.Series</code>, &#8230; -&gt; <code class="highlighter-rouge">pandas.Series</code>.</p>
<p>By using <code class="highlighter-rouge">pandas_udf</code> with the function having such type hints above, it creates a Pandas UDF where the given
function takes one or more <code class="highlighter-rouge">pandas.Series</code> and outputs one <code class="highlighter-rouge">pandas.Series</code>. The output of the function should
always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting
columns into batches and calling the function for each batch as a subset of the data, then concatenating
the results together.</p>
<p>The following example shows how to create this Pandas UDF that computes the product of 2 columns.</p>
<div class="codetabs">
<div data-lang="python">
<div class="highlight"><pre class="codehilite"><code><span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">col</span><span class="p">,</span> <span class="n">pandas_udf</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="n">LongType</span>
<span class="c1"># Declare the function and create the UDF
</span><span class="k">def</span> <span class="nf">multiply_func</span><span class="p">(</span><span class="n">a</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">:</span>
<span class="k">return</span> <span class="n">a</span> <span class="o">*</span> <span class="n">b</span>
<span class="n">multiply</span> <span class="o">=</span> <span class="n">pandas_udf</span><span class="p">(</span><span class="n">multiply_func</span><span class="p">,</span> <span class="n">returnType</span><span class="o">=</span><span class="n">LongType</span><span class="p">())</span>
<span class="c1"># The function for a pandas_udf should be able to execute with local Pandas data
</span><span class="n">x</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">([</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">])</span>
<span class="k">print</span><span class="p">(</span><span class="n">multiply_func</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">x</span><span class="p">))</span>
<span class="c1"># 0 1
# 1 4
# 2 9
# dtype: int64
</span>
<span class="c1"># Create a Spark DataFrame, 'spark' is an existing SparkSession
</span><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s">"x"</span><span class="p">]))</span>
<span class="c1"># Execute function as a Spark vectorized UDF
</span><span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">multiply</span><span class="p">(</span><span class="n">col</span><span class="p">(</span><span class="s">"x"</span><span class="p">),</span> <span class="n">col</span><span class="p">(</span><span class="s">"x"</span><span class="p">)))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div>
</div>
</div>
<p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf"><code class="highlighter-rouge">pyspark.sql.functions.pandas_udf</code></a></p>
<h3 id="iterator-of-series-to-iterator-of-series">Iterator of Series to Iterator of Series</h3>
<p>The type hint can be expressed as <code class="highlighter-rouge">Iterator[pandas.Series]</code> -&gt; <code class="highlighter-rouge">Iterator[pandas.Series]</code>.</p>
<p>By using <code class="highlighter-rouge">pandas_udf</code> with the function having such type hints above, it creates a Pandas UDF where the given
function takes an iterator of <code class="highlighter-rouge">pandas.Series</code> and outputs an iterator of <code class="highlighter-rouge">pandas.Series</code>. The
length of the entire output from the function should be the same length of the entire input; therefore, it can
prefetch the data from the input iterator as long as the lengths are the same.
In this case, the created Pandas UDF requires one input column when the Pandas UDF is called. To use
multiple input columns, a different type hint is required. See Iterator of Multiple Series to Iterator
of Series.</p>
<p>It is also useful when the UDF execution requires initializing some states although internally it works
identically as Series to Series case. The pseudocode below illustrates the example.</p>
<figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="o">@</span><span class="n">pandas_udf</span><span class="p">(</span><span class="s">"long"</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">calculate</span><span class="p">(</span><span class="n">iterator</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">]:</span>
<span class="c1"># Do some expensive initialization with a state
</span> <span class="n">state</span> <span class="o">=</span> <span class="n">very_expensive_initialization</span><span class="p">()</span>
<span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span>
<span class="c1"># Use that state for whole iterator.
</span> <span class="k">yield</span> <span class="n">calculate_with_state</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">state</span><span class="p">)</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">calculate</span><span class="p">(</span><span class="s">"value"</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span></code></pre></figure>
<p>The following example shows how to create this Pandas UDF:</p>
<div class="codetabs">
<div data-lang="python">
<div class="highlight"><pre class="codehilite"><code><span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Iterator</span>
<span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">pandas_udf</span>
<span class="n">pdf</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">([</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">],</span> <span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s">"x"</span><span class="p">])</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">pdf</span><span class="p">)</span>
<span class="c1"># Declare the function and create the UDF
</span><span class="o">@</span><span class="n">pandas_udf</span><span class="p">(</span><span class="s">"long"</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">plus_one</span><span class="p">(</span><span class="n">iterator</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">]:</span>
<span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">x</span> <span class="o">+</span> <span class="mi">1</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">plus_one</span><span class="p">(</span><span class="s">"x"</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div>
</div>
</div>
<p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf"><code class="highlighter-rouge">pyspark.sql.functions.pandas_udf</code></a></p>
<h3 id="iterator-of-multiple-series-to-iterator-of-series">Iterator of Multiple Series to Iterator of Series</h3>
<p>The type hint can be expressed as <code class="highlighter-rouge">Iterator[Tuple[pandas.Series, ...]]</code> -&gt; <code class="highlighter-rouge">Iterator[pandas.Series]</code>.</p>
<p>By using <code class="highlighter-rouge">pandas_udf</code> with the function having such type hints above, it creates a Pandas UDF where the
given function takes an iterator of a tuple of multiple <code class="highlighter-rouge">pandas.Series</code> and outputs an iterator of <code class="highlighter-rouge">pandas.Series</code>.
In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple
when the Pandas UDF is called. Otherwise, it has the same characteristics and restrictions as Iterator of Series
to Iterator of Series case.</p>
<p>The following example shows how to create this Pandas UDF:</p>
<div class="codetabs">
<div data-lang="python">
<div class="highlight"><pre class="codehilite"><code><span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Iterator</span><span class="p">,</span> <span class="n">Tuple</span>
<span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">pandas_udf</span>
<span class="n">pdf</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">([</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">],</span> <span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s">"x"</span><span class="p">])</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">pdf</span><span class="p">)</span>
<span class="c1"># Declare the function and create the UDF
</span><span class="o">@</span><span class="n">pandas_udf</span><span class="p">(</span><span class="s">"long"</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">multiply_two_cols</span><span class="p">(</span>
<span class="n">iterator</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">]])</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">]:</span>
<span class="k">for</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">a</span> <span class="o">*</span> <span class="n">b</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">multiply_two_cols</span><span class="p">(</span><span class="s">"x"</span><span class="p">,</span> <span class="s">"x"</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div>
</div>
</div>
<p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf"><code class="highlighter-rouge">pyspark.sql.functions.pandas_udf</code></a></p>
<h3 id="series-to-scalar">Series to Scalar</h3>
<p>The type hint can be expressed as <code class="highlighter-rouge">pandas.Series</code>, &#8230; -&gt; <code class="highlighter-rouge">Any</code>.</p>
<p>By using <code class="highlighter-rouge">pandas_udf</code> with the function having such type hints above, it creates a Pandas UDF similar
to PySpark&#8217;s aggregate functions. The given function takes <code class="highlighter-rouge">pandas.Series</code> and returns a scalar value.
The return type should be a primitive data type, and the returned scalar can be either a python
primitive type, e.g., <code class="highlighter-rouge">int</code> or <code class="highlighter-rouge">float</code> or a numpy data type, e.g., <code class="highlighter-rouge">numpy.int64</code> or <code class="highlighter-rouge">numpy.float64</code>.
<code class="highlighter-rouge">Any</code> should ideally be a specific scalar type accordingly.</p>
<p>This UDF can be also used with <code class="highlighter-rouge">groupBy().agg()</code> and <a href="api/python/pyspark.sql.html#pyspark.sql.Window"><code class="highlighter-rouge">pyspark.sql.Window</code></a>.
It defines an aggregation from one or more <code class="highlighter-rouge">pandas.Series</code> to a scalar value, where each <code class="highlighter-rouge">pandas.Series</code>
represents a column within the group or window.</p>
<p>Note that this type of UDF does not support partial aggregation and all data for a group or window
will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas
UDFs currently. The following example shows how to use this type of UDF to compute mean with a group-by
and window operations:</p>
<div class="codetabs">
<div data-lang="python">
<div class="highlight"><pre class="codehilite"><code><span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">pandas_udf</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">Window</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span>
<span class="p">[(</span><span class="mi">1</span><span class="p">,</span> <span class="mf">1.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mf">2.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">3.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">5.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">10.0</span><span class="p">)],</span>
<span class="p">(</span><span class="s">"id"</span><span class="p">,</span> <span class="s">"v"</span><span class="p">))</span>
<span class="c1"># Declare the function and create the UDF
</span><span class="o">@</span><span class="n">pandas_udf</span><span class="p">(</span><span class="s">"double"</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">mean_udf</span><span class="p">(</span><span class="n">v</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">float</span><span class="p">:</span>
<span class="k">return</span> <span class="n">v</span><span class="o">.</span><span class="n">mean</span><span class="p">()</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">mean_udf</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s">'v'</span><span class="p">]))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
</span>
<span class="n">df</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s">"id"</span><span class="p">)</span><span class="o">.</span><span class="n">agg</span><span class="p">(</span><span class="n">mean_udf</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s">'v'</span><span class="p">]))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
</span>
<span class="n">w</span> <span class="o">=</span> <span class="n">Window</span> \
<span class="o">.</span><span class="n">partitionBy</span><span class="p">(</span><span class="s">'id'</span><span class="p">)</span> \
<span class="o">.</span><span class="n">rowsBetween</span><span class="p">(</span><span class="n">Window</span><span class="o">.</span><span class="n">unboundedPreceding</span><span class="p">,</span> <span class="n">Window</span><span class="o">.</span><span class="n">unboundedFollowing</span><span class="p">)</span>
<span class="n">df</span><span class="o">.</span><span class="n">withColumn</span><span class="p">(</span><span class="s">'mean_v'</span><span class="p">,</span> <span class="n">mean_udf</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s">'v'</span><span class="p">])</span><span class="o">.</span><span class="n">over</span><span class="p">(</span><span class="n">w</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div>
</div>
</div>
<p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf"><code class="highlighter-rouge">pyspark.sql.functions.pandas_udf</code></a></p>
<h2 id="pandas-function-apis">Pandas Function APIs</h2>
<p>Pandas Function APIs can directly apply a Python native function against the whole <code class="highlighter-rouge">DataFrame</code> by
using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer
data and Pandas to work with the data, which allows vectorized operations. However, A Pandas Function
API behaves as a regular API under PySpark <code class="highlighter-rouge">DataFrame</code> instead of <code class="highlighter-rouge">Column</code>, and Python type hints in Pandas
Functions APIs are optional and do not affect how it works internally at this moment although they
might be required in the future.</p>
<p>From Spark 3.0, grouped map pandas UDF is now categorized as a separate Pandas Function API,
<code class="highlighter-rouge">DataFrame.groupby().applyInPandas()</code>. It is still possible to use it with <code class="highlighter-rouge">PandasUDFType</code>
and <code class="highlighter-rouge">DataFrame.groupby().apply()</code> as it was; however, it is preferred to use
<code class="highlighter-rouge">DataFrame.groupby().applyInPandas()</code> directly. Using <code class="highlighter-rouge">PandasUDFType</code> will be deprecated
in the future.</p>
<h3 id="grouped-map">Grouped Map</h3>
<p>Grouped map operations with Pandas instances are supported by <code class="highlighter-rouge">DataFrame.groupby().applyInPandas()</code>
which requires a Python function that takes a <code class="highlighter-rouge">pandas.DataFrame</code> and return another <code class="highlighter-rouge">pandas.DataFrame</code>.
It maps each group to each <code class="highlighter-rouge">pandas.DataFrame</code> in the Python function.</p>
<p>This API implements the &#8220;split-apply-combine&#8221; pattern which consists of three steps:</p>
<ul>
<li>Split the data into groups by using <code class="highlighter-rouge">DataFrame.groupBy</code>.</li>
<li>Apply a function on each group. The input and output of the function are both <code class="highlighter-rouge">pandas.DataFrame</code>. The
input data contains all the rows and columns for each group.</li>
<li>Combine the results into a new PySpark <code class="highlighter-rouge">DataFrame</code>.</li>
</ul>
<p>To use <code class="highlighter-rouge">groupBy().applyInPandas()</code>, the user needs to define the following:</p>
<ul>
<li>A Python function that defines the computation for each group.</li>
<li>A <code class="highlighter-rouge">StructType</code> object or a string that defines the schema of the output PySpark <code class="highlighter-rouge">DataFrame</code>.</li>
</ul>
<p>The column labels of the returned <code class="highlighter-rouge">pandas.DataFrame</code> must either match the field names in the
defined output schema if specified as strings, or match the field data types by position if not
strings, e.g. integer indices. See <a href="https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame">pandas.DataFrame</a>
on how to label columns when constructing a <code class="highlighter-rouge">pandas.DataFrame</code>.</p>
<p>Note that all data for a group will be loaded into memory before the function is applied. This can
lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for
<a href="#setting-arrow-batch-size">maxRecordsPerBatch</a> is not applied on groups and it is up to the user
to ensure that the grouped data will fit into the available memory.</p>
<p>The following example shows how to use <code class="highlighter-rouge">groupby().applyInPandas()</code> to subtract the mean from each value
in the group.</p>
<div class="codetabs">
<div data-lang="python">
<div class="highlight"><pre class="codehilite"><code><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span>
<span class="p">[(</span><span class="mi">1</span><span class="p">,</span> <span class="mf">1.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mf">2.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">3.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">5.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">10.0</span><span class="p">)],</span>
<span class="p">(</span><span class="s">"id"</span><span class="p">,</span> <span class="s">"v"</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">subtract_mean</span><span class="p">(</span><span class="n">pdf</span><span class="p">):</span>
<span class="c1"># pdf is a pandas.DataFrame
</span> <span class="n">v</span> <span class="o">=</span> <span class="n">pdf</span><span class="o">.</span><span class="n">v</span>
<span class="k">return</span> <span class="n">pdf</span><span class="o">.</span><span class="n">assign</span><span class="p">(</span><span class="n">v</span><span class="o">=</span><span class="n">v</span> <span class="o">-</span> <span class="n">v</span><span class="o">.</span><span class="n">mean</span><span class="p">())</span>
<span class="n">df</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s">"id"</span><span class="p">)</span><span class="o">.</span><span class="n">applyInPandas</span><span class="p">(</span><span class="n">subtract_mean</span><span class="p">,</span> <span class="n">schema</span><span class="o">=</span><span class="s">"id long, v double"</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div>
</div>
</div>
<p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.GroupedData.applyInPandas"><code class="highlighter-rouge">pyspark.sql.GroupedData.applyInPandas</code></a>.</p>
<h3 id="map">Map</h3>
<p>Map operations with Pandas instances are supported by <code class="highlighter-rouge">DataFrame.mapInPandas()</code> which maps an iterator
of <code class="highlighter-rouge">pandas.DataFrame</code>s to another iterator of <code class="highlighter-rouge">pandas.DataFrame</code>s that represents the current
PySpark <code class="highlighter-rouge">DataFrame</code> and returns the result as a PySpark <code class="highlighter-rouge">DataFrame</code>. The functions takes and outputs
an iterator of <code class="highlighter-rouge">pandas.DataFrame</code>. It can return the output of arbitrary length in contrast to some
Pandas UDFs although internally it works similarly with Series to Series Pandas UDF.</p>
<p>The following example shows how to use <code class="highlighter-rouge">mapInPandas()</code>:</p>
<div class="codetabs">
<div data-lang="python">
<div class="highlight"><pre class="codehilite"><code><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">([(</span><span class="mi">1</span><span class="p">,</span> <span class="mi">21</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mi">30</span><span class="p">)],</span> <span class="p">(</span><span class="s">"id"</span><span class="p">,</span> <span class="s">"age"</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">filter_func</span><span class="p">(</span><span class="n">iterator</span><span class="p">):</span>
<span class="k">for</span> <span class="n">pdf</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">pdf</span><span class="p">[</span><span class="n">pdf</span><span class="o">.</span><span class="nb">id</span> <span class="o">==</span> <span class="mi">1</span><span class="p">]</span>
<span class="n">df</span><span class="o">.</span><span class="n">mapInPandas</span><span class="p">(</span><span class="n">filter_func</span><span class="p">,</span> <span class="n">schema</span><span class="o">=</span><span class="n">df</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div>
</div>
</div>
<p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas"><code class="highlighter-rouge">pyspark.sql.DataFrame.mapsInPandas</code></a>.</p>
<h3 id="co-grouped-map">Co-grouped Map</h3>
<p>Co-grouped map operations with Pandas instances are supported by <code class="highlighter-rouge">DataFrame.groupby().cogroup().applyInPandas()</code> which
allows two PySpark <code class="highlighter-rouge">DataFrame</code>s to be cogrouped by a common key and then a Python function applied to each
cogroup. It consists of the following steps:</p>
<ul>
<li>Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.</li>
<li>Apply a function to each cogroup. The input of the function is two <code class="highlighter-rouge">pandas.DataFrame</code> (with an optional tuple
representing the key). The output of the function is a <code class="highlighter-rouge">pandas.DataFrame</code>.</li>
<li>Combine the <code class="highlighter-rouge">pandas.DataFrame</code>s from all groups into a new PySpark <code class="highlighter-rouge">DataFrame</code>.</li>
</ul>
<p>To use <code class="highlighter-rouge">groupBy().cogroup().applyInPandas()</code>, the user needs to define the following:</p>
<ul>
<li>A Python function that defines the computation for each cogroup.</li>
<li>A <code class="highlighter-rouge">StructType</code> object or a string that defines the schema of the output PySpark <code class="highlighter-rouge">DataFrame</code>.</li>
</ul>
<p>The column labels of the returned <code class="highlighter-rouge">pandas.DataFrame</code> must either match the field names in the
defined output schema if specified as strings, or match the field data types by position if not
strings, e.g. integer indices. See <a href="https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame">pandas.DataFrame</a>
on how to label columns when constructing a <code class="highlighter-rouge">pandas.DataFrame</code>.</p>
<p>Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of
memory exceptions, especially if the group sizes are skewed. The configuration for <a href="#setting-arrow-batch-size">maxRecordsPerBatch</a>
is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.</p>
<p>The following example shows how to use <code class="highlighter-rouge">groupby().cogroup().applyInPandas()</code> to perform an asof join between two datasets.</p>
<div class="codetabs">
<div data-lang="python">
<div class="highlight"><pre class="codehilite"><code><span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span>
<span class="n">df1</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span>
<span class="p">[(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mf">1.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mf">2.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000102</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mf">3.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000102</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mf">4.0</span><span class="p">)],</span>
<span class="p">(</span><span class="s">"time"</span><span class="p">,</span> <span class="s">"id"</span><span class="p">,</span> <span class="s">"v1"</span><span class="p">))</span>
<span class="n">df2</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span>
<span class="p">[(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="s">"x"</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="s">"y"</span><span class="p">)],</span>
<span class="p">(</span><span class="s">"time"</span><span class="p">,</span> <span class="s">"id"</span><span class="p">,</span> <span class="s">"v2"</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">asof_join</span><span class="p">(</span><span class="n">l</span><span class="p">,</span> <span class="n">r</span><span class="p">):</span>
<span class="k">return</span> <span class="n">pd</span><span class="o">.</span><span class="n">merge_asof</span><span class="p">(</span><span class="n">l</span><span class="p">,</span> <span class="n">r</span><span class="p">,</span> <span class="n">on</span><span class="o">=</span><span class="s">"time"</span><span class="p">,</span> <span class="n">by</span><span class="o">=</span><span class="s">"id"</span><span class="p">)</span>
<span class="n">df1</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s">"id"</span><span class="p">)</span><span class="o">.</span><span class="n">cogroup</span><span class="p">(</span><span class="n">df2</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s">"id"</span><span class="p">))</span><span class="o">.</span><span class="n">applyInPandas</span><span class="p">(</span>
<span class="n">asof_join</span><span class="p">,</span> <span class="n">schema</span><span class="o">=</span><span class="s">"time int, id int, v1 double, v2 string"</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div>
</div>
</div>
<p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.PandasCogroupedOps.applyInPandas"><code class="highlighter-rouge">pyspark.sql.PandasCogroupedOps.applyInPandas()</code></a>.</p>
<h2 id="usage-notes">Usage Notes</h2>
<h3 id="supported-sql-types">Supported SQL Types</h3>
<p>Currently, all Spark SQL data types are supported by Arrow-based conversion except <code class="highlighter-rouge">MapType</code>,
<code class="highlighter-rouge">ArrayType</code> of <code class="highlighter-rouge">TimestampType</code>, and nested <code class="highlighter-rouge">StructType</code>.</p>
<h3 id="setting-arrow-batch-size">Setting Arrow Batch Size</h3>
<p>Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to
high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow
record batches can be adjusted by setting the conf &#8220;spark.sql.execution.arrow.maxRecordsPerBatch&#8221;
to an integer that will determine the maximum number of rows for each batch. The default value is
10,000 records per batch. If the number of columns is large, the value should be adjusted
accordingly. Using this limit, each data partition will be made into 1 or more record batches for
processing.</p>
<h3 id="timestamp-with-time-zone-semantics">Timestamp with Time Zone Semantics</h3>
<p>Spark internally stores timestamps as UTC values, and timestamp data that is brought in without
a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp
data is exported or displayed in Spark, the session time zone is used to localize the timestamp
values. The session time zone is set with the configuration &#8216;spark.sql.session.timeZone&#8217; and will
default to the JVM system local time zone if not set. Pandas uses a <code class="highlighter-rouge">datetime64</code> type with nanosecond
resolution, <code class="highlighter-rouge">datetime64[ns]</code>, with optional time zone on a per-column basis.</p>
<p>When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds
and each column will be converted to the Spark session time zone then localized to that time
zone, which removes the time zone and displays values as local time. This will occur
when calling <code class="highlighter-rouge">toPandas()</code> or <code class="highlighter-rouge">pandas_udf</code> with timestamp columns.</p>
<p>When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds. This
occurs when calling <code class="highlighter-rouge">createDataFrame</code> with a Pandas DataFrame or when returning a timestamp from a
<code class="highlighter-rouge">pandas_udf</code>. These conversions are done automatically to ensure Spark will have data in the
expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond
values will be truncated.</p>
<p>Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is
different than a Pandas timestamp. It is recommended to use Pandas time series functionality when
working with timestamps in <code class="highlighter-rouge">pandas_udf</code>s to get the best performance, see
<a href="https://pandas.pydata.org/pandas-docs/stable/timeseries.html">here</a> for details.</p>
<h3 id="recommended-pandas-and-pyarrow-versions">Recommended Pandas and PyArrow Versions</h3>
<p>For usage with pyspark.sql, the supported versions of Pandas is 0.24.2 and PyArrow is 0.15.1. Higher
versions may be used, however, compatibility and data correctness can not be guaranteed and should
be verified by the user.</p>
<h3 id="compatibility-setting-for-pyarrow--0150-and-spark-23x-24x">Compatibility Setting for PyArrow &gt;= 0.15.0 and Spark 2.3.x, 2.4.x</h3>
<p>Since Arrow 0.15.0, a change in the binary IPC format requires an environment variable to be
compatible with previous versions of Arrow &lt;= 0.14.1. This is only necessary to do for PySpark
users with versions 2.3.x and 2.4.x that have manually upgraded PyArrow to 0.15.0. The following
can be added to <code class="highlighter-rouge">conf/spark-env.sh</code> to use the legacy Arrow IPC format:</p>
<div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>ARROW_PRE_0_15_IPC_FORMAT=1
</code></pre></div></div>
<p>This will instruct PyArrow &gt;= 0.15.0 to use the legacy IPC format with the older Arrow Java that
is in Spark 2.3.x and 2.4.x. Not setting this environment variable will lead to a similar error as
described in <a href="https://issues.apache.org/jira/browse/SPARK-29367">SPARK-29367</a> when running
<code class="highlighter-rouge">pandas_udf</code>s or <code class="highlighter-rouge">toPandas()</code> with Arrow enabled. More information about the Arrow IPC change can
be read on the Arrow 0.15.0 release <a href="http://arrow.apache.org/blog/2019/10/06/0.15.0-release/#columnar-streaming-protocol-change-since-0140">blog</a>.</p>
</div>
<!-- /container -->
</div>
<script src="js/vendor/jquery-3.4.1.min.js"></script>
<script src="js/vendor/bootstrap.min.js"></script>
<script src="js/vendor/anchor.min.js"></script>
<script src="js/main.js"></script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' +
'?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>