| |
| <!DOCTYPE html> |
| <!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--> |
| <!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]--> |
| <!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> |
| <title>PySpark Usage Guide for Pandas with Apache Arrow - Spark 3.0.0 Documentation</title> |
| |
| |
| |
| |
| <link rel="stylesheet" href="css/bootstrap.min.css"> |
| <style> |
| body { |
| padding-top: 60px; |
| padding-bottom: 40px; |
| } |
| </style> |
| <meta name="viewport" content="width=device-width"> |
| <link rel="stylesheet" href="css/bootstrap-responsive.min.css"> |
| <link rel="stylesheet" href="css/main.css"> |
| |
| <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> |
| |
| <link rel="stylesheet" href="css/pygments-default.css"> |
| |
| |
| <!-- Google analytics script --> |
| <script type="text/javascript"> |
| var _gaq = _gaq || []; |
| _gaq.push(['_setAccount', 'UA-32518208-2']); |
| _gaq.push(['_trackPageview']); |
| |
| (function() { |
| var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true; |
| ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js'; |
| var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s); |
| })(); |
| </script> |
| |
| |
| </head> |
| <body> |
| <!--[if lt IE 7]> |
| <p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p> |
| <![endif]--> |
| |
| <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html --> |
| |
| <div class="navbar navbar-fixed-top" id="topbar"> |
| <div class="navbar-inner"> |
| <div class="container"> |
| <div class="brand"><a href="index.html"> |
| <img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">3.0.0</span> |
| </div> |
| <ul class="nav"> |
| <!--TODO(andyk): Add class="active" attribute to li some how.--> |
| <li><a href="index.html">Overview</a></li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="quick-start.html">Quick Start</a></li> |
| <li><a href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a></li> |
| <li><a href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a></li> |
| <li><a href="structured-streaming-programming-guide.html">Structured Streaming</a></li> |
| <li><a href="streaming-programming-guide.html">Spark Streaming (DStreams)</a></li> |
| <li><a href="ml-guide.html">MLlib (Machine Learning)</a></li> |
| <li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li> |
| <li><a href="sparkr.html">SparkR (R on Spark)</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="api/scala/org/apache/spark/index.html">Scala</a></li> |
| <li><a href="api/java/index.html">Java</a></li> |
| <li><a href="api/python/index.html">Python</a></li> |
| <li><a href="api/R/index.html">R</a></li> |
| <li><a href="api/sql/index.html">SQL, Built-in Functions</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="cluster-overview.html">Overview</a></li> |
| <li><a href="submitting-applications.html">Submitting Applications</a></li> |
| <li class="divider"></li> |
| <li><a href="spark-standalone.html">Spark Standalone</a></li> |
| <li><a href="running-on-mesos.html">Mesos</a></li> |
| <li><a href="running-on-yarn.html">YARN</a></li> |
| <li><a href="running-on-kubernetes.html">Kubernetes</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="configuration.html">Configuration</a></li> |
| <li><a href="monitoring.html">Monitoring</a></li> |
| <li><a href="tuning.html">Tuning Guide</a></li> |
| <li><a href="job-scheduling.html">Job Scheduling</a></li> |
| <li><a href="security.html">Security</a></li> |
| <li><a href="hardware-provisioning.html">Hardware Provisioning</a></li> |
| <li><a href="migration-guide.html">Migration Guide</a></li> |
| <li class="divider"></li> |
| <li><a href="building-spark.html">Building Spark</a></li> |
| <li><a href="https://spark.apache.org/contributing.html">Contributing to Spark</a></li> |
| <li><a href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a></li> |
| </ul> |
| </li> |
| </ul> |
| <!--<p class="navbar-text pull-right"><span class="version-text">v3.0.0</span></p>--> |
| </div> |
| </div> |
| </div> |
| |
| <div class="container-wrapper"> |
| |
| |
| |
| <div class="left-menu-wrapper"> |
| <div class="left-menu"> |
| <h3><a href="sql-programming-guide.html">Spark SQL Guide</a></h3> |
| |
| <ul> |
| |
| <li> |
| <a href="sql-getting-started.html"> |
| |
| Getting Started |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-data-sources.html"> |
| |
| Data Sources |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-performance-tuning.html"> |
| |
| Performance Tuning |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-distributed-sql-engine.html"> |
| |
| Distributed SQL Engine |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-pyspark-pandas-with-arrow.html"> |
| |
| <b>PySpark Usage Guide for Pandas with Apache Arrow</b> |
| |
| </a> |
| </li> |
| |
| |
| |
| <ul> |
| |
| <li> |
| <a href="sql-pyspark-pandas-with-arrow.html#apache-arrow-in-spark"> |
| |
| Apache Arrow in Spark |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-pyspark-pandas-with-arrow.html#enabling-for-conversion-tofrom-pandas"> |
| |
| Enabling for Conversion to/from Pandas |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs"> |
| |
| Pandas UDFs (a.k.a. Vectorized UDFs) |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-pyspark-pandas-with-arrow.html#pandas-function-apis"> |
| |
| Pandas Function APIs |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-pyspark-pandas-with-arrow.html#usage-notes"> |
| |
| Usage Notes |
| |
| </a> |
| </li> |
| |
| |
| |
| </ul> |
| |
| |
| |
| <li> |
| <a href="sql-migration-old.html"> |
| |
| Migration Guide |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-ref.html"> |
| |
| SQL Reference |
| |
| </a> |
| </li> |
| |
| |
| |
| </ul> |
| |
| </div> |
| </div> |
| |
| <input id="nav-trigger" class="nav-trigger" checked type="checkbox"> |
| <label for="nav-trigger"></label> |
| <div class="content-with-sidebar" id="content"> |
| |
| <h1 class="title">PySpark Usage Guide for Pandas with Apache Arrow</h1> |
| |
| |
| <ul id="markdown-toc"> |
| <li><a href="#apache-arrow-in-pyspark" id="markdown-toc-apache-arrow-in-pyspark">Apache Arrow in PySpark</a> <ul> |
| <li><a href="#ensure-pyarrow-installed" id="markdown-toc-ensure-pyarrow-installed">Ensure PyArrow Installed</a></li> |
| </ul> |
| </li> |
| <li><a href="#enabling-for-conversion-tofrom-pandas" id="markdown-toc-enabling-for-conversion-tofrom-pandas">Enabling for Conversion to/from Pandas</a></li> |
| <li><a href="#pandas-udfs-aka-vectorized-udfs" id="markdown-toc-pandas-udfs-aka-vectorized-udfs">Pandas UDFs (a.k.a. Vectorized UDFs)</a> <ul> |
| <li><a href="#series-to-series" id="markdown-toc-series-to-series">Series to Series</a></li> |
| <li><a href="#iterator-of-series-to-iterator-of-series" id="markdown-toc-iterator-of-series-to-iterator-of-series">Iterator of Series to Iterator of Series</a></li> |
| <li><a href="#iterator-of-multiple-series-to-iterator-of-series" id="markdown-toc-iterator-of-multiple-series-to-iterator-of-series">Iterator of Multiple Series to Iterator of Series</a></li> |
| <li><a href="#series-to-scalar" id="markdown-toc-series-to-scalar">Series to Scalar</a></li> |
| </ul> |
| </li> |
| <li><a href="#pandas-function-apis" id="markdown-toc-pandas-function-apis">Pandas Function APIs</a> <ul> |
| <li><a href="#grouped-map" id="markdown-toc-grouped-map">Grouped Map</a></li> |
| <li><a href="#map" id="markdown-toc-map">Map</a></li> |
| <li><a href="#co-grouped-map" id="markdown-toc-co-grouped-map">Co-grouped Map</a></li> |
| </ul> |
| </li> |
| <li><a href="#usage-notes" id="markdown-toc-usage-notes">Usage Notes</a> <ul> |
| <li><a href="#supported-sql-types" id="markdown-toc-supported-sql-types">Supported SQL Types</a></li> |
| <li><a href="#setting-arrow-batch-size" id="markdown-toc-setting-arrow-batch-size">Setting Arrow Batch Size</a></li> |
| <li><a href="#timestamp-with-time-zone-semantics" id="markdown-toc-timestamp-with-time-zone-semantics">Timestamp with Time Zone Semantics</a></li> |
| <li><a href="#recommended-pandas-and-pyarrow-versions" id="markdown-toc-recommended-pandas-and-pyarrow-versions">Recommended Pandas and PyArrow Versions</a></li> |
| <li><a href="#compatibility-setting-for-pyarrow--0150-and-spark-23x-24x" id="markdown-toc-compatibility-setting-for-pyarrow--0150-and-spark-23x-24x">Compatibility Setting for PyArrow >= 0.15.0 and Spark 2.3.x, 2.4.x</a></li> |
| </ul> |
| </li> |
| </ul> |
| |
| <h2 id="apache-arrow-in-pyspark">Apache Arrow in PySpark</h2> |
| |
| <p>Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer |
| data between JVM and Python processes. This currently is most beneficial to Python users that |
| work with Pandas/NumPy data. Its usage is not automatic and might require some minor |
| changes to configuration or code to take full advantage and ensure compatibility. This guide will |
| give a high-level description of how to use Arrow in Spark and highlight any differences when |
| working with Arrow-enabled data.</p> |
| |
| <h3 id="ensure-pyarrow-installed">Ensure PyArrow Installed</h3> |
| |
| <p>To use Apache Arrow in PySpark, <a href="#recommended-pandas-and-pyarrow-versions">the recommended version of PyArrow</a> |
| should be installed. |
| If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the |
| SQL module with the command <code class="highlighter-rouge">pip install pyspark[sql]</code>. Otherwise, you must ensure that PyArrow |
| is installed and available on all cluster nodes. |
| You can install using pip or conda from the conda-forge channel. See PyArrow |
| <a href="https://arrow.apache.org/docs/python/install.html">installation</a> for details.</p> |
| |
| <h2 id="enabling-for-conversion-tofrom-pandas">Enabling for Conversion to/from Pandas</h2> |
| |
| <p>Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame |
| using the call <code class="highlighter-rouge">toPandas()</code> and when creating a Spark DataFrame from a Pandas DataFrame with |
| <code class="highlighter-rouge">createDataFrame(pandas_df)</code>. To use Arrow when executing these calls, users need to first set |
| the Spark configuration <code class="highlighter-rouge">spark.sql.execution.arrow.pyspark.enabled</code> to <code class="highlighter-rouge">true</code>. This is disabled by default.</p> |
| |
| <p>In addition, optimizations enabled by <code class="highlighter-rouge">spark.sql.execution.arrow.pyspark.enabled</code> could fallback automatically |
| to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. |
| This can be controlled by <code class="highlighter-rouge">spark.sql.execution.arrow.pyspark.fallback.enabled</code>.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="python"> |
| <div class="highlight"><pre class="codehilite"><code><span class="kn">import</span> <span class="nn">numpy</span> <span class="k">as</span> <span class="n">np</span> |
| <span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span> |
| |
| <span class="c1"># Enable Arrow-based columnar data transfers |
| </span><span class="n">spark</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="nb">set</span><span class="p">(</span><span class="s">"spark.sql.execution.arrow.pyspark.enabled"</span><span class="p">,</span> <span class="s">"true"</span><span class="p">)</span> |
| |
| <span class="c1"># Generate a Pandas DataFrame |
| </span><span class="n">pdf</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">(</span><span class="n">np</span><span class="o">.</span><span class="n">random</span><span class="o">.</span><span class="n">rand</span><span class="p">(</span><span class="mi">100</span><span class="p">,</span> <span class="mi">3</span><span class="p">))</span> |
| |
| <span class="c1"># Create a Spark DataFrame from a Pandas DataFrame using Arrow |
| </span><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">pdf</span><span class="p">)</span> |
| |
| <span class="c1"># Convert the Spark DataFrame back to a Pandas DataFrame using Arrow |
| </span><span class="n">result_pdf</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s">"*"</span><span class="p">)</span><span class="o">.</span><span class="n">toPandas</span><span class="p">()</span></code></pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div> |
| </div> |
| </div> |
| |
| <p>Using the above optimizations with Arrow will produce the same results as when Arrow is not |
| enabled. Note that even with Arrow, <code class="highlighter-rouge">toPandas()</code> results in the collection of all records in the |
| DataFrame to the driver program and should be done on a small subset of the data. Not all Spark |
| data types are currently supported and an error can be raised if a column has an unsupported type, |
| see <a href="#supported-sql-types">Supported SQL Types</a>. If an error occurs during <code class="highlighter-rouge">createDataFrame()</code>, |
| Spark will fall back to create the DataFrame without Arrow.</p> |
| |
| <h2 id="pandas-udfs-aka-vectorized-udfs">Pandas UDFs (a.k.a. Vectorized UDFs)</h2> |
| |
| <p>Pandas UDFs are user defined functions that are executed by Spark using |
| Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas |
| UDF is defined using the <code class="highlighter-rouge">pandas_udf</code> as a decorator or to wrap the function, and no additional |
| configuration is required. A Pandas UDF behaves as a regular PySpark function API in general.</p> |
| |
| <p>Before Spark 3.0, Pandas UDFs used to be defined with <code class="highlighter-rouge">PandasUDFType</code>. From Spark 3.0 |
| with Python 3.6+, you can also use <a href="https://www.python.org/dev/peps/pep-0484">Python type hints</a>. |
| Using Python type hints are preferred and using <code class="highlighter-rouge">PandasUDFType</code> will be deprecated in |
| the future release.</p> |
| |
| <p>Note that the type hint should use <code class="highlighter-rouge">pandas.Series</code> in all cases but there is one variant |
| that <code class="highlighter-rouge">pandas.DataFrame</code> should be used for its input or output type hint instead when the input |
| or output column is of <code class="highlighter-rouge">StructType</code>. The following example shows a Pandas UDF which takes long |
| column, string column and struct column, and outputs a struct column. It requires the function to |
| specify the type hints of <code class="highlighter-rouge">pandas.Series</code> and <code class="highlighter-rouge">pandas.DataFrame</code> as below:</p> |
| |
| <p> |
| <div class="codetabs"> |
| <div data-lang="python"> |
| <div class="highlight"><pre class="codehilite"><code><span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span> |
| |
| <span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">pandas_udf</span> |
| |
| <span class="o">@</span><span class="n">pandas_udf</span><span class="p">(</span><span class="s">"col1 string, col2 long"</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">func</span><span class="p">(</span><span class="n">s1</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">,</span> <span class="n">s2</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">,</span> <span class="n">s3</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span> <span class="o">-></span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">:</span> |
| <span class="n">s3</span><span class="p">[</span><span class="s">'col2'</span><span class="p">]</span> <span class="o">=</span> <span class="n">s1</span> <span class="o">+</span> <span class="n">s2</span><span class="o">.</span><span class="nb">str</span><span class="o">.</span><span class="nb">len</span><span class="p">()</span> |
| <span class="k">return</span> <span class="n">s3</span> |
| |
| <span class="c1"># Create a Spark DataFrame that has three columns including a sturct column. |
| </span><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span> |
| <span class="p">[[</span><span class="mi">1</span><span class="p">,</span> <span class="s">"a string"</span><span class="p">,</span> <span class="p">(</span><span class="s">"a nested string"</span><span class="p">,)]],</span> |
| <span class="s">"long_col long, string_col string, struct_col struct<col1:string>"</span><span class="p">)</span> |
| |
| <span class="n">df</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span> |
| <span class="c1"># root |
| # |-- long_column: long (nullable = true) |
| # |-- string_column: string (nullable = true) |
| # |-- struct_column: struct (nullable = true) |
| # | |-- col1: string (nullable = true) |
| </span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">func</span><span class="p">(</span><span class="s">"long_col"</span><span class="p">,</span> <span class="s">"string_col"</span><span class="p">,</span> <span class="s">"struct_col"</span><span class="p">))</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span> |
| <span class="c1"># |-- func(long_col, string_col, struct_col): struct (nullable = true) |
| # | |-- col1: string (nullable = true) |
| # | |-- col2: long (nullable = true)</span></code></pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div> |
| </div> |
| </div> |
| </p> |
| |
| <p>In the following sections, it describes the combinations of the supported type hints. For simplicity, |
| <code class="highlighter-rouge">pandas.DataFrame</code> variant is omitted.</p> |
| |
| <h3 id="series-to-series">Series to Series</h3> |
| |
| <p>The type hint can be expressed as <code class="highlighter-rouge">pandas.Series</code>, … -> <code class="highlighter-rouge">pandas.Series</code>.</p> |
| |
| <p>By using <code class="highlighter-rouge">pandas_udf</code> with the function having such type hints above, it creates a Pandas UDF where the given |
| function takes one or more <code class="highlighter-rouge">pandas.Series</code> and outputs one <code class="highlighter-rouge">pandas.Series</code>. The output of the function should |
| always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting |
| columns into batches and calling the function for each batch as a subset of the data, then concatenating |
| the results together.</p> |
| |
| <p>The following example shows how to create this Pandas UDF that computes the product of 2 columns.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="python"> |
| <div class="highlight"><pre class="codehilite"><code><span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span> |
| |
| <span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">col</span><span class="p">,</span> <span class="n">pandas_udf</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="n">LongType</span> |
| |
| <span class="c1"># Declare the function and create the UDF |
| </span><span class="k">def</span> <span class="nf">multiply_func</span><span class="p">(</span><span class="n">a</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">)</span> <span class="o">-></span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">a</span> <span class="o">*</span> <span class="n">b</span> |
| |
| <span class="n">multiply</span> <span class="o">=</span> <span class="n">pandas_udf</span><span class="p">(</span><span class="n">multiply_func</span><span class="p">,</span> <span class="n">returnType</span><span class="o">=</span><span class="n">LongType</span><span class="p">())</span> |
| |
| <span class="c1"># The function for a pandas_udf should be able to execute with local Pandas data |
| </span><span class="n">x</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">([</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">])</span> |
| <span class="k">print</span><span class="p">(</span><span class="n">multiply_func</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">x</span><span class="p">))</span> |
| <span class="c1"># 0 1 |
| # 1 4 |
| # 2 9 |
| # dtype: int64 |
| </span> |
| <span class="c1"># Create a Spark DataFrame, 'spark' is an existing SparkSession |
| </span><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s">"x"</span><span class="p">]))</span> |
| |
| <span class="c1"># Execute function as a Spark vectorized UDF |
| </span><span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">multiply</span><span class="p">(</span><span class="n">col</span><span class="p">(</span><span class="s">"x"</span><span class="p">),</span> <span class="n">col</span><span class="p">(</span><span class="s">"x"</span><span class="p">)))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +-------------------+ |
| # |multiply_func(x, x)| |
| # +-------------------+ |
| # | 1| |
| # | 4| |
| # | 9| |
| # +-------------------+</span></code></pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div> |
| </div> |
| </div> |
| |
| <p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf"><code class="highlighter-rouge">pyspark.sql.functions.pandas_udf</code></a></p> |
| |
| <h3 id="iterator-of-series-to-iterator-of-series">Iterator of Series to Iterator of Series</h3> |
| |
| <p>The type hint can be expressed as <code class="highlighter-rouge">Iterator[pandas.Series]</code> -> <code class="highlighter-rouge">Iterator[pandas.Series]</code>.</p> |
| |
| <p>By using <code class="highlighter-rouge">pandas_udf</code> with the function having such type hints above, it creates a Pandas UDF where the given |
| function takes an iterator of <code class="highlighter-rouge">pandas.Series</code> and outputs an iterator of <code class="highlighter-rouge">pandas.Series</code>. The |
| length of the entire output from the function should be the same length of the entire input; therefore, it can |
| prefetch the data from the input iterator as long as the lengths are the same. |
| In this case, the created Pandas UDF requires one input column when the Pandas UDF is called. To use |
| multiple input columns, a different type hint is required. See Iterator of Multiple Series to Iterator |
| of Series.</p> |
| |
| <p>It is also useful when the UDF execution requires initializing some states although internally it works |
| identically as Series to Series case. The pseudocode below illustrates the example.</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="o">@</span><span class="n">pandas_udf</span><span class="p">(</span><span class="s">"long"</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">calculate</span><span class="p">(</span><span class="n">iterator</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">])</span> <span class="o">-></span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">]:</span> |
| <span class="c1"># Do some expensive initialization with a state |
| </span> <span class="n">state</span> <span class="o">=</span> <span class="n">very_expensive_initialization</span><span class="p">()</span> |
| <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span> |
| <span class="c1"># Use that state for whole iterator. |
| </span> <span class="k">yield</span> <span class="n">calculate_with_state</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">state</span><span class="p">)</span> |
| |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">calculate</span><span class="p">(</span><span class="s">"value"</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span></code></pre></figure> |
| |
| <p>The following example shows how to create this Pandas UDF:</p> |
| |
| <div class="codetabs"> |
| <div data-lang="python"> |
| <div class="highlight"><pre class="codehilite"><code><span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Iterator</span> |
| |
| <span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span> |
| |
| <span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">pandas_udf</span> |
| |
| <span class="n">pdf</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">([</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">],</span> <span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s">"x"</span><span class="p">])</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">pdf</span><span class="p">)</span> |
| |
| <span class="c1"># Declare the function and create the UDF |
| </span><span class="o">@</span><span class="n">pandas_udf</span><span class="p">(</span><span class="s">"long"</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">plus_one</span><span class="p">(</span><span class="n">iterator</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">])</span> <span class="o">-></span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">]:</span> |
| <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">x</span> <span class="o">+</span> <span class="mi">1</span> |
| |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">plus_one</span><span class="p">(</span><span class="s">"x"</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +-----------+ |
| # |plus_one(x)| |
| # +-----------+ |
| # | 2| |
| # | 3| |
| # | 4| |
| # +-----------+</span></code></pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div> |
| </div> |
| </div> |
| |
| <p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf"><code class="highlighter-rouge">pyspark.sql.functions.pandas_udf</code></a></p> |
| |
| <h3 id="iterator-of-multiple-series-to-iterator-of-series">Iterator of Multiple Series to Iterator of Series</h3> |
| |
| <p>The type hint can be expressed as <code class="highlighter-rouge">Iterator[Tuple[pandas.Series, ...]]</code> -> <code class="highlighter-rouge">Iterator[pandas.Series]</code>.</p> |
| |
| <p>By using <code class="highlighter-rouge">pandas_udf</code> with the function having such type hints above, it creates a Pandas UDF where the |
| given function takes an iterator of a tuple of multiple <code class="highlighter-rouge">pandas.Series</code> and outputs an iterator of <code class="highlighter-rouge">pandas.Series</code>. |
| In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple |
| when the Pandas UDF is called. Otherwise, it has the same characteristics and restrictions as Iterator of Series |
| to Iterator of Series case.</p> |
| |
| <p>The following example shows how to create this Pandas UDF:</p> |
| |
| <div class="codetabs"> |
| <div data-lang="python"> |
| <div class="highlight"><pre class="codehilite"><code><span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Iterator</span><span class="p">,</span> <span class="n">Tuple</span> |
| |
| <span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span> |
| |
| <span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">pandas_udf</span> |
| |
| <span class="n">pdf</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">([</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">],</span> <span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s">"x"</span><span class="p">])</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">pdf</span><span class="p">)</span> |
| |
| <span class="c1"># Declare the function and create the UDF |
| </span><span class="o">@</span><span class="n">pandas_udf</span><span class="p">(</span><span class="s">"long"</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">multiply_two_cols</span><span class="p">(</span> |
| <span class="n">iterator</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">]])</span> <span class="o">-></span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">]:</span> |
| <span class="k">for</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">a</span> <span class="o">*</span> <span class="n">b</span> |
| |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">multiply_two_cols</span><span class="p">(</span><span class="s">"x"</span><span class="p">,</span> <span class="s">"x"</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +-----------------------+ |
| # |multiply_two_cols(x, x)| |
| # +-----------------------+ |
| # | 1| |
| # | 4| |
| # | 9| |
| # +-----------------------+</span></code></pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div> |
| </div> |
| </div> |
| |
| <p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf"><code class="highlighter-rouge">pyspark.sql.functions.pandas_udf</code></a></p> |
| |
| <h3 id="series-to-scalar">Series to Scalar</h3> |
| |
| <p>The type hint can be expressed as <code class="highlighter-rouge">pandas.Series</code>, … -> <code class="highlighter-rouge">Any</code>.</p> |
| |
| <p>By using <code class="highlighter-rouge">pandas_udf</code> with the function having such type hints above, it creates a Pandas UDF similar |
| to PySpark’s aggregate functions. The given function takes <code class="highlighter-rouge">pandas.Series</code> and returns a scalar value. |
| The return type should be a primitive data type, and the returned scalar can be either a python |
| primitive type, e.g., <code class="highlighter-rouge">int</code> or <code class="highlighter-rouge">float</code> or a numpy data type, e.g., <code class="highlighter-rouge">numpy.int64</code> or <code class="highlighter-rouge">numpy.float64</code>. |
| <code class="highlighter-rouge">Any</code> should ideally be a specific scalar type accordingly.</p> |
| |
| <p>This UDF can be also used with <code class="highlighter-rouge">groupBy().agg()</code> and <a href="api/python/pyspark.sql.html#pyspark.sql.Window"><code class="highlighter-rouge">pyspark.sql.Window</code></a>. |
| It defines an aggregation from one or more <code class="highlighter-rouge">pandas.Series</code> to a scalar value, where each <code class="highlighter-rouge">pandas.Series</code> |
| represents a column within the group or window.</p> |
| |
| <p>Note that this type of UDF does not support partial aggregation and all data for a group or window |
| will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas |
| UDFs currently. The following example shows how to use this type of UDF to compute mean with a group-by |
| and window operations:</p> |
| |
| <div class="codetabs"> |
| <div data-lang="python"> |
| <div class="highlight"><pre class="codehilite"><code><span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span> |
| |
| <span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">pandas_udf</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">Window</span> |
| |
| <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span> |
| <span class="p">[(</span><span class="mi">1</span><span class="p">,</span> <span class="mf">1.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mf">2.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">3.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">5.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">10.0</span><span class="p">)],</span> |
| <span class="p">(</span><span class="s">"id"</span><span class="p">,</span> <span class="s">"v"</span><span class="p">))</span> |
| |
| <span class="c1"># Declare the function and create the UDF |
| </span><span class="o">@</span><span class="n">pandas_udf</span><span class="p">(</span><span class="s">"double"</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">mean_udf</span><span class="p">(</span><span class="n">v</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">)</span> <span class="o">-></span> <span class="nb">float</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">v</span><span class="o">.</span><span class="n">mean</span><span class="p">()</span> |
| |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">mean_udf</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s">'v'</span><span class="p">]))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +-----------+ |
| # |mean_udf(v)| |
| # +-----------+ |
| # | 4.2| |
| # +-----------+ |
| </span> |
| <span class="n">df</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s">"id"</span><span class="p">)</span><span class="o">.</span><span class="n">agg</span><span class="p">(</span><span class="n">mean_udf</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s">'v'</span><span class="p">]))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +---+-----------+ |
| # | id|mean_udf(v)| |
| # +---+-----------+ |
| # | 1| 1.5| |
| # | 2| 6.0| |
| # +---+-----------+ |
| </span> |
| <span class="n">w</span> <span class="o">=</span> <span class="n">Window</span> \ |
| <span class="o">.</span><span class="n">partitionBy</span><span class="p">(</span><span class="s">'id'</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">rowsBetween</span><span class="p">(</span><span class="n">Window</span><span class="o">.</span><span class="n">unboundedPreceding</span><span class="p">,</span> <span class="n">Window</span><span class="o">.</span><span class="n">unboundedFollowing</span><span class="p">)</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">withColumn</span><span class="p">(</span><span class="s">'mean_v'</span><span class="p">,</span> <span class="n">mean_udf</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s">'v'</span><span class="p">])</span><span class="o">.</span><span class="n">over</span><span class="p">(</span><span class="n">w</span><span class="p">))</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +---+----+------+ |
| # | id| v|mean_v| |
| # +---+----+------+ |
| # | 1| 1.0| 1.5| |
| # | 1| 2.0| 1.5| |
| # | 2| 3.0| 6.0| |
| # | 2| 5.0| 6.0| |
| # | 2|10.0| 6.0| |
| # +---+----+------+</span></code></pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div> |
| </div> |
| </div> |
| |
| <p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf"><code class="highlighter-rouge">pyspark.sql.functions.pandas_udf</code></a></p> |
| |
| <h2 id="pandas-function-apis">Pandas Function APIs</h2> |
| |
| <p>Pandas Function APIs can directly apply a Python native function against the whole <code class="highlighter-rouge">DataFrame</code> by |
| using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer |
| data and Pandas to work with the data, which allows vectorized operations. However, A Pandas Function |
| API behaves as a regular API under PySpark <code class="highlighter-rouge">DataFrame</code> instead of <code class="highlighter-rouge">Column</code>, and Python type hints in Pandas |
| Functions APIs are optional and do not affect how it works internally at this moment although they |
| might be required in the future.</p> |
| |
| <p>From Spark 3.0, grouped map pandas UDF is now categorized as a separate Pandas Function API, |
| <code class="highlighter-rouge">DataFrame.groupby().applyInPandas()</code>. It is still possible to use it with <code class="highlighter-rouge">PandasUDFType</code> |
| and <code class="highlighter-rouge">DataFrame.groupby().apply()</code> as it was; however, it is preferred to use |
| <code class="highlighter-rouge">DataFrame.groupby().applyInPandas()</code> directly. Using <code class="highlighter-rouge">PandasUDFType</code> will be deprecated |
| in the future.</p> |
| |
| <h3 id="grouped-map">Grouped Map</h3> |
| |
| <p>Grouped map operations with Pandas instances are supported by <code class="highlighter-rouge">DataFrame.groupby().applyInPandas()</code> |
| which requires a Python function that takes a <code class="highlighter-rouge">pandas.DataFrame</code> and return another <code class="highlighter-rouge">pandas.DataFrame</code>. |
| It maps each group to each <code class="highlighter-rouge">pandas.DataFrame</code> in the Python function.</p> |
| |
| <p>This API implements the “split-apply-combine” pattern which consists of three steps:</p> |
| <ul> |
| <li>Split the data into groups by using <code class="highlighter-rouge">DataFrame.groupBy</code>.</li> |
| <li>Apply a function on each group. The input and output of the function are both <code class="highlighter-rouge">pandas.DataFrame</code>. The |
| input data contains all the rows and columns for each group.</li> |
| <li>Combine the results into a new PySpark <code class="highlighter-rouge">DataFrame</code>.</li> |
| </ul> |
| |
| <p>To use <code class="highlighter-rouge">groupBy().applyInPandas()</code>, the user needs to define the following:</p> |
| <ul> |
| <li>A Python function that defines the computation for each group.</li> |
| <li>A <code class="highlighter-rouge">StructType</code> object or a string that defines the schema of the output PySpark <code class="highlighter-rouge">DataFrame</code>.</li> |
| </ul> |
| |
| <p>The column labels of the returned <code class="highlighter-rouge">pandas.DataFrame</code> must either match the field names in the |
| defined output schema if specified as strings, or match the field data types by position if not |
| strings, e.g. integer indices. See <a href="https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame">pandas.DataFrame</a> |
| on how to label columns when constructing a <code class="highlighter-rouge">pandas.DataFrame</code>.</p> |
| |
| <p>Note that all data for a group will be loaded into memory before the function is applied. This can |
| lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for |
| <a href="#setting-arrow-batch-size">maxRecordsPerBatch</a> is not applied on groups and it is up to the user |
| to ensure that the grouped data will fit into the available memory.</p> |
| |
| <p>The following example shows how to use <code class="highlighter-rouge">groupby().applyInPandas()</code> to subtract the mean from each value |
| in the group.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="python"> |
| <div class="highlight"><pre class="codehilite"><code><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span> |
| <span class="p">[(</span><span class="mi">1</span><span class="p">,</span> <span class="mf">1.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mf">2.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">3.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">5.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mf">10.0</span><span class="p">)],</span> |
| <span class="p">(</span><span class="s">"id"</span><span class="p">,</span> <span class="s">"v"</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">subtract_mean</span><span class="p">(</span><span class="n">pdf</span><span class="p">):</span> |
| <span class="c1"># pdf is a pandas.DataFrame |
| </span> <span class="n">v</span> <span class="o">=</span> <span class="n">pdf</span><span class="o">.</span><span class="n">v</span> |
| <span class="k">return</span> <span class="n">pdf</span><span class="o">.</span><span class="n">assign</span><span class="p">(</span><span class="n">v</span><span class="o">=</span><span class="n">v</span> <span class="o">-</span> <span class="n">v</span><span class="o">.</span><span class="n">mean</span><span class="p">())</span> |
| |
| <span class="n">df</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s">"id"</span><span class="p">)</span><span class="o">.</span><span class="n">applyInPandas</span><span class="p">(</span><span class="n">subtract_mean</span><span class="p">,</span> <span class="n">schema</span><span class="o">=</span><span class="s">"id long, v double"</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +---+----+ |
| # | id| v| |
| # +---+----+ |
| # | 1|-0.5| |
| # | 1| 0.5| |
| # | 2|-3.0| |
| # | 2|-1.0| |
| # | 2| 4.0| |
| # +---+----+</span></code></pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div> |
| </div> |
| </div> |
| |
| <p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.GroupedData.applyInPandas"><code class="highlighter-rouge">pyspark.sql.GroupedData.applyInPandas</code></a>.</p> |
| |
| <h3 id="map">Map</h3> |
| |
| <p>Map operations with Pandas instances are supported by <code class="highlighter-rouge">DataFrame.mapInPandas()</code> which maps an iterator |
| of <code class="highlighter-rouge">pandas.DataFrame</code>s to another iterator of <code class="highlighter-rouge">pandas.DataFrame</code>s that represents the current |
| PySpark <code class="highlighter-rouge">DataFrame</code> and returns the result as a PySpark <code class="highlighter-rouge">DataFrame</code>. The functions takes and outputs |
| an iterator of <code class="highlighter-rouge">pandas.DataFrame</code>. It can return the output of arbitrary length in contrast to some |
| Pandas UDFs although internally it works similarly with Series to Series Pandas UDF.</p> |
| |
| <p>The following example shows how to use <code class="highlighter-rouge">mapInPandas()</code>:</p> |
| |
| <div class="codetabs"> |
| <div data-lang="python"> |
| <div class="highlight"><pre class="codehilite"><code><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">([(</span><span class="mi">1</span><span class="p">,</span> <span class="mi">21</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="mi">30</span><span class="p">)],</span> <span class="p">(</span><span class="s">"id"</span><span class="p">,</span> <span class="s">"age"</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">filter_func</span><span class="p">(</span><span class="n">iterator</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">pdf</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">pdf</span><span class="p">[</span><span class="n">pdf</span><span class="o">.</span><span class="nb">id</span> <span class="o">==</span> <span class="mi">1</span><span class="p">]</span> |
| |
| <span class="n">df</span><span class="o">.</span><span class="n">mapInPandas</span><span class="p">(</span><span class="n">filter_func</span><span class="p">,</span> <span class="n">schema</span><span class="o">=</span><span class="n">df</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +---+---+ |
| # | id|age| |
| # +---+---+ |
| # | 1| 21| |
| # +---+---+</span></code></pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div> |
| </div> |
| </div> |
| |
| <p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas"><code class="highlighter-rouge">pyspark.sql.DataFrame.mapsInPandas</code></a>.</p> |
| |
| <h3 id="co-grouped-map">Co-grouped Map</h3> |
| |
| <p>Co-grouped map operations with Pandas instances are supported by <code class="highlighter-rouge">DataFrame.groupby().cogroup().applyInPandas()</code> which |
| allows two PySpark <code class="highlighter-rouge">DataFrame</code>s to be cogrouped by a common key and then a Python function applied to each |
| cogroup. It consists of the following steps:</p> |
| <ul> |
| <li>Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.</li> |
| <li>Apply a function to each cogroup. The input of the function is two <code class="highlighter-rouge">pandas.DataFrame</code> (with an optional tuple |
| representing the key). The output of the function is a <code class="highlighter-rouge">pandas.DataFrame</code>.</li> |
| <li>Combine the <code class="highlighter-rouge">pandas.DataFrame</code>s from all groups into a new PySpark <code class="highlighter-rouge">DataFrame</code>.</li> |
| </ul> |
| |
| <p>To use <code class="highlighter-rouge">groupBy().cogroup().applyInPandas()</code>, the user needs to define the following:</p> |
| <ul> |
| <li>A Python function that defines the computation for each cogroup.</li> |
| <li>A <code class="highlighter-rouge">StructType</code> object or a string that defines the schema of the output PySpark <code class="highlighter-rouge">DataFrame</code>.</li> |
| </ul> |
| |
| <p>The column labels of the returned <code class="highlighter-rouge">pandas.DataFrame</code> must either match the field names in the |
| defined output schema if specified as strings, or match the field data types by position if not |
| strings, e.g. integer indices. See <a href="https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame">pandas.DataFrame</a> |
| on how to label columns when constructing a <code class="highlighter-rouge">pandas.DataFrame</code>.</p> |
| |
| <p>Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of |
| memory exceptions, especially if the group sizes are skewed. The configuration for <a href="#setting-arrow-batch-size">maxRecordsPerBatch</a> |
| is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.</p> |
| |
| <p>The following example shows how to use <code class="highlighter-rouge">groupby().cogroup().applyInPandas()</code> to perform an asof join between two datasets.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="python"> |
| <div class="highlight"><pre class="codehilite"><code><span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="n">pd</span> |
| |
| <span class="n">df1</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span> |
| <span class="p">[(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mf">1.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mf">2.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000102</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mf">3.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000102</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mf">4.0</span><span class="p">)],</span> |
| <span class="p">(</span><span class="s">"time"</span><span class="p">,</span> <span class="s">"id"</span><span class="p">,</span> <span class="s">"v1"</span><span class="p">))</span> |
| |
| <span class="n">df2</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span> |
| <span class="p">[(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="s">"x"</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="s">"y"</span><span class="p">)],</span> |
| <span class="p">(</span><span class="s">"time"</span><span class="p">,</span> <span class="s">"id"</span><span class="p">,</span> <span class="s">"v2"</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">asof_join</span><span class="p">(</span><span class="n">l</span><span class="p">,</span> <span class="n">r</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">pd</span><span class="o">.</span><span class="n">merge_asof</span><span class="p">(</span><span class="n">l</span><span class="p">,</span> <span class="n">r</span><span class="p">,</span> <span class="n">on</span><span class="o">=</span><span class="s">"time"</span><span class="p">,</span> <span class="n">by</span><span class="o">=</span><span class="s">"id"</span><span class="p">)</span> |
| |
| <span class="n">df1</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s">"id"</span><span class="p">)</span><span class="o">.</span><span class="n">cogroup</span><span class="p">(</span><span class="n">df2</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s">"id"</span><span class="p">))</span><span class="o">.</span><span class="n">applyInPandas</span><span class="p">(</span> |
| <span class="n">asof_join</span><span class="p">,</span> <span class="n">schema</span><span class="o">=</span><span class="s">"time int, id int, v1 double, v2 string"</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +--------+---+---+---+ |
| # | time| id| v1| v2| |
| # +--------+---+---+---+ |
| # |20000101| 1|1.0| x| |
| # |20000102| 1|3.0| x| |
| # |20000101| 2|2.0| y| |
| # |20000102| 2|4.0| y| |
| # +--------+---+---+---+</span></code></pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.</small></div> |
| </div> |
| </div> |
| |
| <p>For detailed usage, please see <a href="api/python/pyspark.sql.html#pyspark.sql.PandasCogroupedOps.applyInPandas"><code class="highlighter-rouge">pyspark.sql.PandasCogroupedOps.applyInPandas()</code></a>.</p> |
| |
| <h2 id="usage-notes">Usage Notes</h2> |
| |
| <h3 id="supported-sql-types">Supported SQL Types</h3> |
| |
| <p>Currently, all Spark SQL data types are supported by Arrow-based conversion except <code class="highlighter-rouge">MapType</code>, |
| <code class="highlighter-rouge">ArrayType</code> of <code class="highlighter-rouge">TimestampType</code>, and nested <code class="highlighter-rouge">StructType</code>.</p> |
| |
| <h3 id="setting-arrow-batch-size">Setting Arrow Batch Size</h3> |
| |
| <p>Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to |
| high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow |
| record batches can be adjusted by setting the conf “spark.sql.execution.arrow.maxRecordsPerBatch” |
| to an integer that will determine the maximum number of rows for each batch. The default value is |
| 10,000 records per batch. If the number of columns is large, the value should be adjusted |
| accordingly. Using this limit, each data partition will be made into 1 or more record batches for |
| processing.</p> |
| |
| <h3 id="timestamp-with-time-zone-semantics">Timestamp with Time Zone Semantics</h3> |
| |
| <p>Spark internally stores timestamps as UTC values, and timestamp data that is brought in without |
| a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp |
| data is exported or displayed in Spark, the session time zone is used to localize the timestamp |
| values. The session time zone is set with the configuration ‘spark.sql.session.timeZone’ and will |
| default to the JVM system local time zone if not set. Pandas uses a <code class="highlighter-rouge">datetime64</code> type with nanosecond |
| resolution, <code class="highlighter-rouge">datetime64[ns]</code>, with optional time zone on a per-column basis.</p> |
| |
| <p>When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds |
| and each column will be converted to the Spark session time zone then localized to that time |
| zone, which removes the time zone and displays values as local time. This will occur |
| when calling <code class="highlighter-rouge">toPandas()</code> or <code class="highlighter-rouge">pandas_udf</code> with timestamp columns.</p> |
| |
| <p>When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds. This |
| occurs when calling <code class="highlighter-rouge">createDataFrame</code> with a Pandas DataFrame or when returning a timestamp from a |
| <code class="highlighter-rouge">pandas_udf</code>. These conversions are done automatically to ensure Spark will have data in the |
| expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond |
| values will be truncated.</p> |
| |
| <p>Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is |
| different than a Pandas timestamp. It is recommended to use Pandas time series functionality when |
| working with timestamps in <code class="highlighter-rouge">pandas_udf</code>s to get the best performance, see |
| <a href="https://pandas.pydata.org/pandas-docs/stable/timeseries.html">here</a> for details.</p> |
| |
| <h3 id="recommended-pandas-and-pyarrow-versions">Recommended Pandas and PyArrow Versions</h3> |
| |
| <p>For usage with pyspark.sql, the supported versions of Pandas is 0.24.2 and PyArrow is 0.15.1. Higher |
| versions may be used, however, compatibility and data correctness can not be guaranteed and should |
| be verified by the user.</p> |
| |
| <h3 id="compatibility-setting-for-pyarrow--0150-and-spark-23x-24x">Compatibility Setting for PyArrow >= 0.15.0 and Spark 2.3.x, 2.4.x</h3> |
| |
| <p>Since Arrow 0.15.0, a change in the binary IPC format requires an environment variable to be |
| compatible with previous versions of Arrow <= 0.14.1. This is only necessary to do for PySpark |
| users with versions 2.3.x and 2.4.x that have manually upgraded PyArrow to 0.15.0. The following |
| can be added to <code class="highlighter-rouge">conf/spark-env.sh</code> to use the legacy Arrow IPC format:</p> |
| |
| <div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>ARROW_PRE_0_15_IPC_FORMAT=1 |
| </code></pre></div></div> |
| |
| <p>This will instruct PyArrow >= 0.15.0 to use the legacy IPC format with the older Arrow Java that |
| is in Spark 2.3.x and 2.4.x. Not setting this environment variable will lead to a similar error as |
| described in <a href="https://issues.apache.org/jira/browse/SPARK-29367">SPARK-29367</a> when running |
| <code class="highlighter-rouge">pandas_udf</code>s or <code class="highlighter-rouge">toPandas()</code> with Arrow enabled. More information about the Arrow IPC change can |
| be read on the Arrow 0.15.0 release <a href="http://arrow.apache.org/blog/2019/10/06/0.15.0-release/#columnar-streaming-protocol-change-since-0140">blog</a>.</p> |
| |
| |
| </div> |
| |
| <!-- /container --> |
| </div> |
| |
| <script src="js/vendor/jquery-3.4.1.min.js"></script> |
| <script src="js/vendor/bootstrap.min.js"></script> |
| <script src="js/vendor/anchor.min.js"></script> |
| <script src="js/main.js"></script> |
| |
| <!-- MathJax Section --> |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| TeX: { equationNumbers: { autoNumber: "AMS" } } |
| }); |
| </script> |
| <script> |
| // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS. |
| // We could use "//cdn.mathjax...", but that won't support "file://". |
| (function(d, script) { |
| script = d.createElement('script'); |
| script.type = 'text/javascript'; |
| script.async = true; |
| script.onload = function(){ |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ], |
| displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], |
| processEscapes: true, |
| skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] |
| } |
| }); |
| }; |
| script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') + |
| 'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' + |
| '?config=TeX-AMS-MML_HTMLorMML'; |
| d.getElementsByTagName('head')[0].appendChild(script); |
| }(document)); |
| </script> |
| </body> |
| </html> |