blob: 5cebdce0daec8ceeaaf9f86f7dcdcd600cbefd56 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>pyspark.sql.PandasCogroupedOps.applyInPandas &#8212; PySpark 3.2.4 documentation</title>
<link rel="stylesheet" href="../../_static/css/index.73d71520a4ca3b99cfee5594769eaaae.css">
<link rel="stylesheet"
href="../../_static/vendor/fontawesome/5.13.0/css/all.min.css">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../_static/vendor/fontawesome/5.13.0/webfonts/fa-solid-900.woff2">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../_static/vendor/fontawesome/5.13.0/webfonts/fa-brands-400.woff2">
<link rel="stylesheet"
href="../../_static/vendor/open-sans_all/1.44.1/index.css">
<link rel="stylesheet"
href="../../_static/vendor/lato_latin-ext/1.44.1/index.css">
<link rel="stylesheet" href="../../_static/basic.css" type="text/css" />
<link rel="stylesheet" href="../../_static/pygments.css" type="text/css" />
<link rel="stylesheet" type="text/css" href="../../_static/css/pyspark.css" />
<link rel="preload" as="script" href="../../_static/js/index.3da636dd464baa7582d2.js">
<script id="documentation_options" data-url_root="../../" src="../../_static/documentation_options.js"></script>
<script src="../../_static/jquery.js"></script>
<script src="../../_static/underscore.js"></script>
<script src="../../_static/doctools.js"></script>
<script src="../../_static/language_data.js"></script>
<script src="../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script async="async" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/x-mathjax-config">MathJax.Hub.Config({"tex2jax": {"inlineMath": [["$", "$"], ["\\(", "\\)"]], "processEscapes": true, "ignoreClass": "document", "processClass": "math|output_area"}})</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.PandasCogroupedOps.applyInPandas.html" />
<link rel="search" title="Search" href="../../search.html" />
<link rel="next" title="pyspark.sql.Catalog.cacheTable" href="pyspark.sql.Catalog.cacheTable.html" />
<link rel="prev" title="pyspark.sql.GroupedData.sum" href="pyspark.sql.GroupedData.sum.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="en" />
</head>
<body data-spy="scroll" data-target="#bd-toc-nav" data-offset="80">
<nav class="navbar navbar-light navbar-expand-lg bg-light fixed-top bd-navbar" id="navbar-main">
<div class="container-xl">
<a class="navbar-brand" href="../../index.html">
<img src="../../_static/spark-logo-reverse.png" class="logo" alt="logo" />
</a>
<button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbar-menu" aria-controls="navbar-menu" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div id="navbar-menu" class="col-lg-9 collapse navbar-collapse">
<ul id="navbar-main-elements" class="navbar-nav mr-auto">
<li class="nav-item ">
<a class="nav-link" href="../../getting_started/index.html">Getting Started</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="../../user_guide/index.html">User Guide</a>
</li>
<li class="nav-item active">
<a class="nav-link" href="../index.html">API Reference</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="../../development/index.html">Development</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="../../migration_guide/index.html">Migration Guide</a>
</li>
</ul>
<ul class="navbar-nav">
</ul>
</div>
</div>
</nav>
<div class="container-xl">
<div class="row">
<div class="col-12 col-md-3 bd-sidebar"><form class="bd-search d-flex align-items-center" action="../../search.html" method="get">
<i class="icon fas fa-search"></i>
<input type="search" class="form-control" name="q" id="search-input" placeholder="Search the docs ..." aria-label="Search the docs ..." autocomplete="off" >
</form>
<nav class="bd-links" id="bd-docs-nav" aria-label="Main navigation">
<div class="bd-toc-item active">
<ul class="nav bd-sidenav">
<li class="active">
<a href="../pyspark.sql.html">Spark SQL</a>
</li>
<li class="">
<a href="../pyspark.pandas/index.html">Pandas API on Spark</a>
</li>
<li class="">
<a href="../pyspark.ss.html">Structured Streaming</a>
</li>
<li class="">
<a href="../pyspark.ml.html">MLlib (DataFrame-based)</a>
</li>
<li class="">
<a href="../pyspark.streaming.html">Spark Streaming</a>
</li>
<li class="">
<a href="../pyspark.mllib.html">MLlib (RDD-based)</a>
</li>
<li class="">
<a href="../pyspark.html">Spark Core</a>
</li>
<li class="">
<a href="../pyspark.resource.html">Resource Management</a>
</li>
</ul>
</nav>
</div>
<div class="d-none d-xl-block col-xl-2 bd-toc">
<nav id="bd-toc-nav">
<ul class="nav section-nav flex-column">
</ul>
</nav>
</div>
<main class="col-12 col-md-9 col-xl-7 py-md-5 pl-md-5 pr-md-4 bd-content" role="main">
<div>
<div class="section" id="pyspark-sql-pandascogroupedops-applyinpandas">
<h1>pyspark.sql.PandasCogroupedOps.applyInPandas<a class="headerlink" href="#pyspark-sql-pandascogroupedops-applyinpandas" title="Permalink to this headline">¶</a></h1>
<dl class="py method">
<dt id="pyspark.sql.PandasCogroupedOps.applyInPandas">
<code class="sig-prename descclassname">PandasCogroupedOps.</code><code class="sig-name descname">applyInPandas</code><span class="sig-paren">(</span><em class="sig-param"><span class="n">func</span></em>, <em class="sig-param"><span class="n">schema</span></em><span class="sig-paren">)</span><a class="reference internal" href="../../_modules/pyspark/sql/pandas/group_ops.html#PandasCogroupedOps.applyInPandas"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#pyspark.sql.PandasCogroupedOps.applyInPandas" title="Permalink to this definition">¶</a></dt>
<dd><p>Applies a function to each cogroup using pandas and returns the result
as a <cite>DataFrame</cite>.</p>
<p>The function should take two <cite>pandas.DataFrame</cite>s and return another
<cite>pandas.DataFrame</cite>. For each side of the cogroup, all columns are passed together as a
<cite>pandas.DataFrame</cite> to the user-function and the returned <cite>pandas.DataFrame</cite> are combined as
a <a class="reference internal" href="pyspark.sql.DataFrame.html#pyspark.sql.DataFrame" title="pyspark.sql.DataFrame"><code class="xref py py-class docutils literal notranslate"><span class="pre">DataFrame</span></code></a>.</p>
<p>The <cite>schema</cite> should be a <code class="xref py py-class docutils literal notranslate"><span class="pre">StructType</span></code> describing the schema of the returned
<cite>pandas.DataFrame</cite>. The column labels of the returned <cite>pandas.DataFrame</cite> must either match
the field names in the defined schema if specified as strings, or match the
field data types by position if not strings, e.g. integer indices.
The length of the returned <cite>pandas.DataFrame</cite> can be arbitrary.</p>
<div class="versionadded">
<p><span class="versionmodified added">New in version 3.0.0.</span></p>
</div>
<dl class="field-list">
<dt class="field-odd">Parameters</dt>
<dd class="field-odd"><dl>
<dt><strong>func</strong><span class="classifier">function</span></dt><dd><p>a Python native function that takes two <cite>pandas.DataFrame</cite>s, and
outputs a <cite>pandas.DataFrame</cite>, or that takes one tuple (grouping keys) and two
pandas <code class="docutils literal notranslate"><span class="pre">DataFrame</span></code>s, and outputs a pandas <code class="docutils literal notranslate"><span class="pre">DataFrame</span></code>.</p>
</dd>
<dt><strong>schema</strong><span class="classifier"><a class="reference internal" href="pyspark.sql.types.DataType.html#pyspark.sql.types.DataType" title="pyspark.sql.types.DataType"><code class="xref py py-class docutils literal notranslate"><span class="pre">pyspark.sql.types.DataType</span></code></a> or str</span></dt><dd><p>the return type of the <cite>func</cite> in PySpark. The value can be either a
<a class="reference internal" href="pyspark.sql.types.DataType.html#pyspark.sql.types.DataType" title="pyspark.sql.types.DataType"><code class="xref py py-class docutils literal notranslate"><span class="pre">pyspark.sql.types.DataType</span></code></a> object or a DDL-formatted type string.</p>
</dd>
</dl>
</dd>
</dl>
<div class="admonition seealso">
<p class="admonition-title">See also</p>
<dl class="simple">
<dt><a class="reference internal" href="pyspark.sql.functions.pandas_udf.html#pyspark.sql.functions.pandas_udf" title="pyspark.sql.functions.pandas_udf"><code class="xref py py-obj docutils literal notranslate"><span class="pre">pyspark.sql.functions.pandas_udf</span></code></a></dt><dd></dd>
</dl>
</div>
<p class="rubric">Notes</p>
<p>This function requires a full shuffle. All the data of a cogroup will be loaded
into memory, so the user should be aware of the potential OOM risk if data is skewed
and certain groups are too large to fit in memory.</p>
<p>If returning a new <cite>pandas.DataFrame</cite> constructed with a dictionary, it is
recommended to explicitly index the columns by name to ensure the positions are correct,
or alternatively use an <cite>OrderedDict</cite>.
For example, <cite>pd.DataFrame({‘id’: ids, ‘a’: data}, columns=[‘id’, ‘a’])</cite> or
<cite>pd.DataFrame(OrderedDict([(‘id’, ids), (‘a’, data)]))</cite>.</p>
<p>This API is experimental.</p>
<p class="rubric">Examples</p>
<div class="doctest highlight-default notranslate"><div class="highlight"><pre><span></span><span class="gp">&gt;&gt;&gt; </span><span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">pandas_udf</span>
<span class="gp">&gt;&gt;&gt; </span><span class="n">df1</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span>
<span class="gp">... </span> <span class="p">[(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mf">1.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mf">2.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000102</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mf">3.0</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000102</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mf">4.0</span><span class="p">)],</span>
<span class="gp">... </span> <span class="p">(</span><span class="s2">&quot;time&quot;</span><span class="p">,</span> <span class="s2">&quot;id&quot;</span><span class="p">,</span> <span class="s2">&quot;v1&quot;</span><span class="p">))</span>
<span class="gp">&gt;&gt;&gt; </span><span class="n">df2</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span>
<span class="gp">... </span> <span class="p">[(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="s2">&quot;x&quot;</span><span class="p">),</span> <span class="p">(</span><span class="mi">20000101</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="s2">&quot;y&quot;</span><span class="p">)],</span>
<span class="gp">... </span> <span class="p">(</span><span class="s2">&quot;time&quot;</span><span class="p">,</span> <span class="s2">&quot;id&quot;</span><span class="p">,</span> <span class="s2">&quot;v2&quot;</span><span class="p">))</span>
<span class="gp">&gt;&gt;&gt; </span><span class="k">def</span> <span class="nf">asof_join</span><span class="p">(</span><span class="n">l</span><span class="p">,</span> <span class="n">r</span><span class="p">):</span>
<span class="gp">... </span> <span class="k">return</span> <span class="n">pd</span><span class="o">.</span><span class="n">merge_asof</span><span class="p">(</span><span class="n">l</span><span class="p">,</span> <span class="n">r</span><span class="p">,</span> <span class="n">on</span><span class="o">=</span><span class="s2">&quot;time&quot;</span><span class="p">,</span> <span class="n">by</span><span class="o">=</span><span class="s2">&quot;id&quot;</span><span class="p">)</span>
<span class="gp">&gt;&gt;&gt; </span><span class="n">df1</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s2">&quot;id&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">cogroup</span><span class="p">(</span><span class="n">df2</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s2">&quot;id&quot;</span><span class="p">))</span><span class="o">.</span><span class="n">applyInPandas</span><span class="p">(</span>
<span class="gp">... </span> <span class="n">asof_join</span><span class="p">,</span> <span class="n">schema</span><span class="o">=</span><span class="s2">&quot;time int, id int, v1 double, v2 string&quot;</span>
<span class="gp">... </span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="go">+--------+---+---+---+</span>
<span class="go">| time| id| v1| v2|</span>
<span class="go">+--------+---+---+---+</span>
<span class="go">|20000101| 1|1.0| x|</span>
<span class="go">|20000102| 1|3.0| x|</span>
<span class="go">|20000101| 2|2.0| y|</span>
<span class="go">|20000102| 2|4.0| y|</span>
<span class="go">+--------+---+---+---+</span>
</pre></div>
</div>
<p>Alternatively, the user can define a function that takes three arguments. In this case,
the grouping key(s) will be passed as the first argument and the data will be passed as the
second and third arguments. The grouping key(s) will be passed as a tuple of numpy data
types, e.g., <cite>numpy.int32</cite> and <cite>numpy.float64</cite>. The data will still be passed in as two
<cite>pandas.DataFrame</cite> containing all columns from the original Spark DataFrames.</p>
<div class="doctest highlight-default notranslate"><div class="highlight"><pre><span></span><span class="gp">&gt;&gt;&gt; </span><span class="k">def</span> <span class="nf">asof_join</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">l</span><span class="p">,</span> <span class="n">r</span><span class="p">):</span>
<span class="gp">... </span> <span class="k">if</span> <span class="n">k</span> <span class="o">==</span> <span class="p">(</span><span class="mi">1</span><span class="p">,):</span>
<span class="gp">... </span> <span class="k">return</span> <span class="n">pd</span><span class="o">.</span><span class="n">merge_asof</span><span class="p">(</span><span class="n">l</span><span class="p">,</span> <span class="n">r</span><span class="p">,</span> <span class="n">on</span><span class="o">=</span><span class="s2">&quot;time&quot;</span><span class="p">,</span> <span class="n">by</span><span class="o">=</span><span class="s2">&quot;id&quot;</span><span class="p">)</span>
<span class="gp">... </span> <span class="k">else</span><span class="p">:</span>
<span class="gp">... </span> <span class="k">return</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">(</span><span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;time&#39;</span><span class="p">,</span> <span class="s1">&#39;id&#39;</span><span class="p">,</span> <span class="s1">&#39;v1&#39;</span><span class="p">,</span> <span class="s1">&#39;v2&#39;</span><span class="p">])</span>
<span class="gp">&gt;&gt;&gt; </span><span class="n">df1</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s2">&quot;id&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">cogroup</span><span class="p">(</span><span class="n">df2</span><span class="o">.</span><span class="n">groupby</span><span class="p">(</span><span class="s2">&quot;id&quot;</span><span class="p">))</span><span class="o">.</span><span class="n">applyInPandas</span><span class="p">(</span>
<span class="gp">... </span> <span class="n">asof_join</span><span class="p">,</span> <span class="s2">&quot;time int, id int, v1 double, v2 string&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="go">+--------+---+---+---+</span>
<span class="go">| time| id| v1| v2|</span>
<span class="go">+--------+---+---+---+</span>
<span class="go">|20000101| 1|1.0| x|</span>
<span class="go">|20000102| 1|3.0| x|</span>
<span class="go">+--------+---+---+---+</span>
</pre></div>
</div>
</dd></dl>
</div>
</div>
<div class='prev-next-bottom'>
<a class='left-prev' id="prev-link" href="pyspark.sql.GroupedData.sum.html" title="previous page">pyspark.sql.GroupedData.sum</a>
<a class='right-next' id="next-link" href="pyspark.sql.Catalog.cacheTable.html" title="next page">pyspark.sql.Catalog.cacheTable</a>
</div>
</main>
</div>
</div>
<script src="../../_static/js/index.3da636dd464baa7582d2.js"></script>
<footer class="footer mt-5 mt-md-0">
<div class="container">
<p>
&copy; Copyright .<br/>
Created using <a href="http://sphinx-doc.org/">Sphinx</a> 3.0.4.<br/>
</p>
</div>
</footer>
</body>
</html>