blob: da36f7e3548f1cff5340c839427ea47c94e597ef [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>pyspark.testing.utils &#8212; PySpark 3.5.5 documentation</title>
<link href="../../../_static/styles/theme.css?digest=1999514e3f237ded88cf" rel="stylesheet">
<link href="../../../_static/styles/pydata-sphinx-theme.css?digest=1999514e3f237ded88cf" rel="stylesheet">
<link rel="stylesheet"
href="../../../_static/vendor/fontawesome/5.13.0/css/all.min.css">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../../_static/vendor/fontawesome/5.13.0/webfonts/fa-solid-900.woff2">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../../_static/vendor/fontawesome/5.13.0/webfonts/fa-brands-400.woff2">
<link rel="stylesheet" href="../../../_static/styles/pydata-sphinx-theme.css" type="text/css" />
<link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
<link rel="stylesheet" type="text/css" href="../../../_static/copybutton.css" />
<link rel="stylesheet" type="text/css" href="../../../_static/css/pyspark.css" />
<link rel="preload" as="script" href="../../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf">
<script id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script>
<script src="../../../_static/jquery.js"></script>
<script src="../../../_static/underscore.js"></script>
<script src="../../../_static/doctools.js"></script>
<script src="../../../_static/language_data.js"></script>
<script src="../../../_static/clipboard.min.js"></script>
<script src="../../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script async="async" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/x-mathjax-config">MathJax.Hub.Config({"tex2jax": {"inlineMath": [["$", "$"], ["\\(", "\\)"]], "processEscapes": true, "ignoreClass": "document", "processClass": "math|output_area"}})</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/_modules/pyspark/testing/utils.html" />
<link rel="search" title="Search" href="../../../search.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="None">
<!-- Google Analytics -->
</head>
<body data-spy="scroll" data-target="#bd-toc-nav" data-offset="80">
<div class="container-fluid" id="banner"></div>
<nav class="navbar navbar-light navbar-expand-lg bg-light fixed-top bd-navbar" id="navbar-main"><div class="container-xl">
<div id="navbar-start">
<a class="navbar-brand" href="../../../index.html">
<img src="../../../_static/spark-logo-reverse.png" class="logo" alt="logo">
</a>
</div>
<button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbar-collapsible" aria-controls="navbar-collapsible" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div id="navbar-collapsible" class="col-lg-9 collapse navbar-collapse">
<div id="navbar-center" class="mr-auto">
<div class="navbar-center-item">
<ul id="navbar-main-elements" class="navbar-nav">
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../../index.html">
Overview
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../../user_guide/index.html">
User Guides
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../../reference/index.html">
API Reference
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../../development/index.html">
Development
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</div>
</div>
<div id="navbar-end">
<div class="navbar-end-item">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
3.5.5
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/testing/utils.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script>
</div>
</div>
</div>
</div>
</nav>
<div class="container-xl">
<div class="row">
<!-- Only show if we have sidebars configured, else just a small margin -->
<div class="col-12 col-md-3 bd-sidebar">
<div class="sidebar-start-items"><form class="bd-search d-flex align-items-center" action="../../../search.html" method="get">
<i class="icon fas fa-search"></i>
<input type="search" class="form-control" name="q" id="search-input" placeholder="Search the docs ..." aria-label="Search the docs ..." autocomplete="off" >
</form><nav class="bd-links" id="bd-docs-nav" aria-label="Main navigation">
<div class="bd-toc-item active">
</div>
</nav>
</div>
<div class="sidebar-end-items">
</div>
</div>
<div class="d-none d-xl-block col-xl-2 bd-toc">
</div>
<main class="col-12 col-md-9 col-xl-7 py-md-5 pl-md-5 pr-md-4 bd-content" role="main">
<div>
<h1>Source code for pyspark.testing.utils</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">glob</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">os</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">struct</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">sys</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">unittest</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">difflib</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">time</span><span class="w"> </span><span class="kn">import</span> <span class="n">time</span><span class="p">,</span> <span class="n">sleep</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="p">(</span>
<span class="n">Any</span><span class="p">,</span>
<span class="n">Optional</span><span class="p">,</span>
<span class="n">Union</span><span class="p">,</span>
<span class="n">Dict</span><span class="p">,</span>
<span class="n">List</span><span class="p">,</span>
<span class="n">Tuple</span><span class="p">,</span>
<span class="n">Iterator</span><span class="p">,</span>
<span class="p">)</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">itertools</span><span class="w"> </span><span class="kn">import</span> <span class="n">zip_longest</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark</span><span class="w"> </span><span class="kn">import</span> <span class="n">SparkContext</span><span class="p">,</span> <span class="n">SparkConf</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.errors</span><span class="w"> </span><span class="kn">import</span> <span class="n">PySparkAssertionError</span><span class="p">,</span> <span class="n">PySparkException</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.find_spark_home</span><span class="w"> </span><span class="kn">import</span> <span class="n">_find_spark_home</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.sql.dataframe</span><span class="w"> </span><span class="kn">import</span> <span class="n">DataFrame</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.sql</span><span class="w"> </span><span class="kn">import</span> <span class="n">Row</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.sql.types</span><span class="w"> </span><span class="kn">import</span> <span class="n">StructType</span><span class="p">,</span> <span class="n">AtomicType</span><span class="p">,</span> <span class="n">StructField</span>
<span class="n">have_scipy</span> <span class="o">=</span> <span class="kc">False</span>
<span class="n">have_numpy</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">scipy.sparse</span> <span class="c1"># noqa: F401</span>
<span class="n">have_scipy</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="c1"># No SciPy, but that&#39;s okay, we&#39;ll skip those tests</span>
<span class="k">pass</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">numpy</span><span class="w"> </span><span class="k">as</span><span class="w"> </span><span class="nn">np</span> <span class="c1"># noqa: F401</span>
<span class="n">have_numpy</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="c1"># No NumPy, but that&#39;s okay, we&#39;ll skip those tests</span>
<span class="k">pass</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s2">&quot;assertDataFrameEqual&quot;</span><span class="p">,</span> <span class="s2">&quot;assertSchemaEqual&quot;</span><span class="p">]</span>
<span class="n">SPARK_HOME</span> <span class="o">=</span> <span class="n">_find_spark_home</span><span class="p">()</span>
<span class="k">def</span><span class="w"> </span><span class="nf">read_int</span><span class="p">(</span><span class="n">b</span><span class="p">):</span>
<span class="k">return</span> <span class="n">struct</span><span class="o">.</span><span class="n">unpack</span><span class="p">(</span><span class="s2">&quot;!i&quot;</span><span class="p">,</span> <span class="n">b</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">def</span><span class="w"> </span><span class="nf">write_int</span><span class="p">(</span><span class="n">i</span><span class="p">):</span>
<span class="k">return</span> <span class="n">struct</span><span class="o">.</span><span class="n">pack</span><span class="p">(</span><span class="s2">&quot;!i&quot;</span><span class="p">,</span> <span class="n">i</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="nf">eventually</span><span class="p">(</span><span class="n">condition</span><span class="p">,</span> <span class="n">timeout</span><span class="o">=</span><span class="mf">30.0</span><span class="p">,</span> <span class="n">catch_assertions</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Wait a given amount of time for a condition to pass, else fail with an error.</span>
<span class="sd"> This is a helper utility for PySpark tests.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> condition : function</span>
<span class="sd"> Function that checks for termination conditions. condition() can return:</span>
<span class="sd"> - True: Conditions met. Return without error.</span>
<span class="sd"> - other value: Conditions not met yet. Continue. Upon timeout,</span>
<span class="sd"> include last such value in error message.</span>
<span class="sd"> Note that this method may be called at any time during</span>
<span class="sd"> streaming execution (e.g., even before any results</span>
<span class="sd"> have been created).</span>
<span class="sd"> timeout : int</span>
<span class="sd"> Number of seconds to wait. Default 30 seconds.</span>
<span class="sd"> catch_assertions : bool</span>
<span class="sd"> If False (default), do not catch AssertionErrors.</span>
<span class="sd"> If True, catch AssertionErrors; continue, but save</span>
<span class="sd"> error to throw upon timeout.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">start_time</span> <span class="o">=</span> <span class="n">time</span><span class="p">()</span>
<span class="n">lastValue</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">while</span> <span class="n">time</span><span class="p">()</span> <span class="o">-</span> <span class="n">start_time</span> <span class="o">&lt;</span> <span class="n">timeout</span><span class="p">:</span>
<span class="k">if</span> <span class="n">catch_assertions</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">lastValue</span> <span class="o">=</span> <span class="n">condition</span><span class="p">()</span>
<span class="k">except</span> <span class="ne">AssertionError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">lastValue</span> <span class="o">=</span> <span class="n">e</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">lastValue</span> <span class="o">=</span> <span class="n">condition</span><span class="p">()</span>
<span class="k">if</span> <span class="n">lastValue</span> <span class="ow">is</span> <span class="kc">True</span><span class="p">:</span>
<span class="k">return</span>
<span class="n">sleep</span><span class="p">(</span><span class="mf">0.01</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">lastValue</span><span class="p">,</span> <span class="ne">AssertionError</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">lastValue</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">AssertionError</span><span class="p">(</span>
<span class="s2">&quot;Test failed due to timeout after </span><span class="si">%g</span><span class="s2"> sec, with last condition returning: </span><span class="si">%s</span><span class="s2">&quot;</span>
<span class="o">%</span> <span class="p">(</span><span class="n">timeout</span><span class="p">,</span> <span class="n">lastValue</span><span class="p">)</span>
<span class="p">)</span>
<span class="k">class</span><span class="w"> </span><span class="nc">QuietTest</span><span class="p">:</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sc</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log4j</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">log4j</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">old_level</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">log4j</span><span class="o">.</span><span class="n">LogManager</span><span class="o">.</span><span class="n">getRootLogger</span><span class="p">()</span><span class="o">.</span><span class="n">getLevel</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log4j</span><span class="o">.</span><span class="n">LogManager</span><span class="o">.</span><span class="n">getRootLogger</span><span class="p">()</span><span class="o">.</span><span class="n">setLevel</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">log4j</span><span class="o">.</span><span class="n">Level</span><span class="o">.</span><span class="n">FATAL</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exc_type</span><span class="p">,</span> <span class="n">exc_val</span><span class="p">,</span> <span class="n">exc_tb</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log4j</span><span class="o">.</span><span class="n">LogManager</span><span class="o">.</span><span class="n">getRootLogger</span><span class="p">()</span><span class="o">.</span><span class="n">setLevel</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">old_level</span><span class="p">)</span>
<span class="k">class</span><span class="w"> </span><span class="nc">PySparkTestCase</span><span class="p">(</span><span class="n">unittest</span><span class="o">.</span><span class="n">TestCase</span><span class="p">):</span>
<span class="k">def</span><span class="w"> </span><span class="nf">setUp</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_old_sys_path</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">sys</span><span class="o">.</span><span class="n">path</span><span class="p">)</span>
<span class="n">class_name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="s2">&quot;local[4]&quot;</span><span class="p">,</span> <span class="n">class_name</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="nf">tearDown</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sc</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="n">sys</span><span class="o">.</span><span class="n">path</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_old_sys_path</span>
<span class="k">class</span><span class="w"> </span><span class="nc">ReusedPySparkTestCase</span><span class="p">(</span><span class="n">unittest</span><span class="o">.</span><span class="n">TestCase</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span><span class="w"> </span><span class="nf">conf</span><span class="p">(</span><span class="bp">cls</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Override this in subclasses to supply a more specific conf</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">SparkConf</span><span class="p">()</span>
<span class="nd">@classmethod</span>
<span class="k">def</span><span class="w"> </span><span class="nf">setUpClass</span><span class="p">(</span><span class="bp">cls</span><span class="p">):</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="s2">&quot;local[4]&quot;</span><span class="p">,</span> <span class="bp">cls</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">conf</span><span class="o">=</span><span class="bp">cls</span><span class="o">.</span><span class="n">conf</span><span class="p">())</span>
<span class="nd">@classmethod</span>
<span class="k">def</span><span class="w"> </span><span class="nf">tearDownClass</span><span class="p">(</span><span class="bp">cls</span><span class="p">):</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">sc</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">class</span><span class="w"> </span><span class="nc">ByteArrayOutput</span><span class="p">:</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">=</span> <span class="nb">bytearray</span><span class="p">()</span>
<span class="k">def</span><span class="w"> </span><span class="nf">write</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">b</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">+=</span> <span class="n">b</span>
<span class="k">def</span><span class="w"> </span><span class="nf">close</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">pass</span>
<span class="k">def</span><span class="w"> </span><span class="nf">search_jar</span><span class="p">(</span><span class="n">project_relative_path</span><span class="p">,</span> <span class="n">sbt_jar_name_prefix</span><span class="p">,</span> <span class="n">mvn_jar_name_prefix</span><span class="p">):</span>
<span class="c1"># Note that &#39;sbt_jar_name_prefix&#39; and &#39;mvn_jar_name_prefix&#39; are used since the prefix can</span>
<span class="c1"># vary for SBT or Maven specifically. See also SPARK-26856</span>
<span class="n">project_full_path</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">SPARK_HOME</span><span class="p">,</span> <span class="n">project_relative_path</span><span class="p">)</span>
<span class="c1"># We should ignore the following jars</span>
<span class="n">ignored_jar_suffixes</span> <span class="o">=</span> <span class="p">(</span><span class="s2">&quot;javadoc.jar&quot;</span><span class="p">,</span> <span class="s2">&quot;sources.jar&quot;</span><span class="p">,</span> <span class="s2">&quot;test-sources.jar&quot;</span><span class="p">,</span> <span class="s2">&quot;tests.jar&quot;</span><span class="p">)</span>
<span class="c1"># Search jar in the project dir using the jar name_prefix for both sbt build and maven</span>
<span class="c1"># build because the artifact jars are in different directories.</span>
<span class="n">sbt_build</span> <span class="o">=</span> <span class="n">glob</span><span class="o">.</span><span class="n">glob</span><span class="p">(</span>
<span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">project_full_path</span><span class="p">,</span> <span class="s2">&quot;target/scala-*/</span><span class="si">%s</span><span class="s2">*.jar&quot;</span> <span class="o">%</span> <span class="n">sbt_jar_name_prefix</span><span class="p">)</span>
<span class="p">)</span>
<span class="n">maven_build</span> <span class="o">=</span> <span class="n">glob</span><span class="o">.</span><span class="n">glob</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">project_full_path</span><span class="p">,</span> <span class="s2">&quot;target/</span><span class="si">%s</span><span class="s2">*.jar&quot;</span> <span class="o">%</span> <span class="n">mvn_jar_name_prefix</span><span class="p">))</span>
<span class="n">jar_paths</span> <span class="o">=</span> <span class="n">sbt_build</span> <span class="o">+</span> <span class="n">maven_build</span>
<span class="n">jars</span> <span class="o">=</span> <span class="p">[</span><span class="n">jar</span> <span class="k">for</span> <span class="n">jar</span> <span class="ow">in</span> <span class="n">jar_paths</span> <span class="k">if</span> <span class="ow">not</span> <span class="n">jar</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="n">ignored_jar_suffixes</span><span class="p">)]</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">jars</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">None</span>
<span class="k">elif</span> <span class="nb">len</span><span class="p">(</span><span class="n">jars</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">1</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span><span class="s2">&quot;Found multiple JARs: </span><span class="si">%s</span><span class="s2">; please remove all but one&quot;</span> <span class="o">%</span> <span class="p">(</span><span class="s2">&quot;, &quot;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">jars</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">jars</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">def</span><span class="w"> </span><span class="nf">_terminal_color_support</span><span class="p">():</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># determine if environment supports color</span>
<span class="n">script</span> <span class="o">=</span> <span class="s2">&quot;$(test $(tput colors)) &amp;&amp; $(test $(tput colors) -ge 8) &amp;&amp; echo true || echo false&quot;</span>
<span class="k">return</span> <span class="n">os</span><span class="o">.</span><span class="n">popen</span><span class="p">(</span><span class="n">script</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">()</span>
<span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">def</span><span class="w"> </span><span class="nf">_context_diff</span><span class="p">(</span><span class="n">actual</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">],</span> <span class="n">expected</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">],</span> <span class="n">n</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">3</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Modified from difflib context_diff API,</span>
<span class="sd"> see original code here: https://github.com/python/cpython/blob/main/Lib/difflib.py#L1180</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">red</span><span class="p">(</span><span class="n">s</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">str</span><span class="p">:</span>
<span class="n">red_color</span> <span class="o">=</span> <span class="s2">&quot;</span><span class="se">\033</span><span class="s2">[31m&quot;</span>
<span class="n">no_color</span> <span class="o">=</span> <span class="s2">&quot;</span><span class="se">\033</span><span class="s2">[0m&quot;</span>
<span class="k">return</span> <span class="n">red_color</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">s</span><span class="p">)</span> <span class="o">+</span> <span class="n">no_color</span>
<span class="n">prefix</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">insert</span><span class="o">=</span><span class="s2">&quot;+ &quot;</span><span class="p">,</span> <span class="n">delete</span><span class="o">=</span><span class="s2">&quot;- &quot;</span><span class="p">,</span> <span class="n">replace</span><span class="o">=</span><span class="s2">&quot;! &quot;</span><span class="p">,</span> <span class="n">equal</span><span class="o">=</span><span class="s2">&quot; &quot;</span><span class="p">)</span>
<span class="k">for</span> <span class="n">group</span> <span class="ow">in</span> <span class="n">difflib</span><span class="o">.</span><span class="n">SequenceMatcher</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">actual</span><span class="p">,</span> <span class="n">expected</span><span class="p">)</span><span class="o">.</span><span class="n">get_grouped_opcodes</span><span class="p">(</span><span class="n">n</span><span class="p">):</span>
<span class="k">yield</span> <span class="s2">&quot;*** actual ***&quot;</span>
<span class="k">if</span> <span class="nb">any</span><span class="p">(</span><span class="n">tag</span> <span class="ow">in</span> <span class="p">{</span><span class="s2">&quot;replace&quot;</span><span class="p">,</span> <span class="s2">&quot;delete&quot;</span><span class="p">}</span> <span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">group</span><span class="p">):</span>
<span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">i1</span><span class="p">,</span> <span class="n">i2</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">group</span><span class="p">:</span>
<span class="k">for</span> <span class="n">line</span> <span class="ow">in</span> <span class="n">actual</span><span class="p">[</span><span class="n">i1</span><span class="p">:</span><span class="n">i2</span><span class="p">]:</span>
<span class="k">if</span> <span class="n">tag</span> <span class="o">!=</span> <span class="s2">&quot;equal&quot;</span> <span class="ow">and</span> <span class="n">_terminal_color_support</span><span class="p">():</span>
<span class="k">yield</span> <span class="n">red</span><span class="p">(</span><span class="n">prefix</span><span class="p">[</span><span class="n">tag</span><span class="p">]</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">line</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">prefix</span><span class="p">[</span><span class="n">tag</span><span class="p">]</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">line</span><span class="p">)</span>
<span class="k">yield</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span>
<span class="k">yield</span> <span class="s2">&quot;*** expected ***&quot;</span>
<span class="k">if</span> <span class="nb">any</span><span class="p">(</span><span class="n">tag</span> <span class="ow">in</span> <span class="p">{</span><span class="s2">&quot;replace&quot;</span><span class="p">,</span> <span class="s2">&quot;insert&quot;</span><span class="p">}</span> <span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">group</span><span class="p">):</span>
<span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">j1</span><span class="p">,</span> <span class="n">j2</span> <span class="ow">in</span> <span class="n">group</span><span class="p">:</span>
<span class="k">for</span> <span class="n">line</span> <span class="ow">in</span> <span class="n">expected</span><span class="p">[</span><span class="n">j1</span><span class="p">:</span><span class="n">j2</span><span class="p">]:</span>
<span class="k">if</span> <span class="n">tag</span> <span class="o">!=</span> <span class="s2">&quot;equal&quot;</span> <span class="ow">and</span> <span class="n">_terminal_color_support</span><span class="p">():</span>
<span class="k">yield</span> <span class="n">red</span><span class="p">(</span><span class="n">prefix</span><span class="p">[</span><span class="n">tag</span><span class="p">]</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">line</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">prefix</span><span class="p">[</span><span class="n">tag</span><span class="p">]</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">line</span><span class="p">)</span>
<span class="k">class</span><span class="w"> </span><span class="nc">PySparkErrorTestUtils</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> This util provide functions to accurate and consistent error testing</span>
<span class="sd"> based on PySpark error classes.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">check_error</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">exception</span><span class="p">:</span> <span class="n">PySparkException</span><span class="p">,</span>
<span class="n">error_class</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">):</span>
<span class="c1"># Test if given error is an instance of PySparkException.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertIsInstance</span><span class="p">(</span>
<span class="n">exception</span><span class="p">,</span>
<span class="n">PySparkException</span><span class="p">,</span>
<span class="sa">f</span><span class="s2">&quot;checkError requires &#39;PySparkException&#39;, got &#39;</span><span class="si">{</span><span class="n">exception</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="si">}</span><span class="s2">&#39;.&quot;</span><span class="p">,</span>
<span class="p">)</span>
<span class="c1"># Test error class</span>
<span class="n">expected</span> <span class="o">=</span> <span class="n">error_class</span>
<span class="n">actual</span> <span class="o">=</span> <span class="n">exception</span><span class="o">.</span><span class="n">getErrorClass</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertEqual</span><span class="p">(</span>
<span class="n">expected</span><span class="p">,</span> <span class="n">actual</span><span class="p">,</span> <span class="sa">f</span><span class="s2">&quot;Expected error class was &#39;</span><span class="si">{</span><span class="n">expected</span><span class="si">}</span><span class="s2">&#39;, got &#39;</span><span class="si">{</span><span class="n">actual</span><span class="si">}</span><span class="s2">&#39;.&quot;</span>
<span class="p">)</span>
<span class="c1"># Test message parameters</span>
<span class="n">expected</span> <span class="o">=</span> <span class="n">message_parameters</span>
<span class="n">actual</span> <span class="o">=</span> <span class="n">exception</span><span class="o">.</span><span class="n">getMessageParameters</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertEqual</span><span class="p">(</span>
<span class="n">expected</span><span class="p">,</span> <span class="n">actual</span><span class="p">,</span> <span class="sa">f</span><span class="s2">&quot;Expected message parameters was &#39;</span><span class="si">{</span><span class="n">expected</span><span class="si">}</span><span class="s2">&#39;, got &#39;</span><span class="si">{</span><span class="n">actual</span><span class="si">}</span><span class="s2">&#39;&quot;</span>
<span class="p">)</span>
<div class="viewcode-block" id="assertSchemaEqual"><a class="viewcode-back" href="../../../reference/api/pyspark.testing.assertSchemaEqual.html#pyspark.testing.assertSchemaEqual">[docs]</a><span class="k">def</span><span class="w"> </span><span class="nf">assertSchemaEqual</span><span class="p">(</span><span class="n">actual</span><span class="p">:</span> <span class="n">StructType</span><span class="p">,</span> <span class="n">expected</span><span class="p">:</span> <span class="n">StructType</span><span class="p">):</span>
<span class="w"> </span><span class="sa">r</span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A util function to assert equality between DataFrame schemas `actual` and `expected`.</span>
<span class="sd"> .. versionadded:: 3.5.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> actual : StructType</span>
<span class="sd"> The DataFrame schema that is being compared or tested.</span>
<span class="sd"> expected : StructType</span>
<span class="sd"> The expected schema, for comparison with the actual schema.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> When assertSchemaEqual fails, the error message uses the Python `difflib` library to display</span>
<span class="sd"> a diff log of the `actual` and `expected` schemas.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType, DoubleType</span>
<span class="sd"> &gt;&gt;&gt; s1 = StructType([StructField(&quot;names&quot;, ArrayType(DoubleType(), True), True)])</span>
<span class="sd"> &gt;&gt;&gt; s2 = StructType([StructField(&quot;names&quot;, ArrayType(DoubleType(), True), True)])</span>
<span class="sd"> &gt;&gt;&gt; assertSchemaEqual(s1, s2) # pass, schemas are identical</span>
<span class="sd"> &gt;&gt;&gt; df1 = spark.createDataFrame(data=[(1, 1000), (2, 3000)], schema=[&quot;id&quot;, &quot;number&quot;])</span>
<span class="sd"> &gt;&gt;&gt; df2 = spark.createDataFrame(data=[(&quot;1&quot;, 1000), (&quot;2&quot;, 5000)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; assertSchemaEqual(df1.schema, df2.schema) # doctest: +IGNORE_EXCEPTION_DETAIL</span>
<span class="sd"> Traceback (most recent call last):</span>
<span class="sd"> ...</span>
<span class="sd"> PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.</span>
<span class="sd"> --- actual</span>
<span class="sd"> +++ expected</span>
<span class="sd"> - StructType([StructField(&#39;id&#39;, LongType(), True), StructField(&#39;number&#39;, LongType(), True)])</span>
<span class="sd"> ? ^^ ^^^^^</span>
<span class="sd"> + StructType([StructField(&#39;id&#39;, StringType(), True), StructField(&#39;amount&#39;, LongType(), True)])</span>
<span class="sd"> ? ^^^^ ++++ ^</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="n">StructType</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;UNSUPPORTED_DATA_TYPE&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;data_type&quot;</span><span class="p">:</span> <span class="nb">type</span><span class="p">(</span><span class="n">actual</span><span class="p">)},</span>
<span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expected</span><span class="p">,</span> <span class="n">StructType</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;UNSUPPORTED_DATA_TYPE&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;data_type&quot;</span><span class="p">:</span> <span class="nb">type</span><span class="p">(</span><span class="n">expected</span><span class="p">)},</span>
<span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="nf">compare_schemas_ignore_nullable</span><span class="p">(</span><span class="n">s1</span><span class="p">:</span> <span class="n">StructType</span><span class="p">,</span> <span class="n">s2</span><span class="p">:</span> <span class="n">StructType</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">s1</span><span class="p">)</span> <span class="o">!=</span> <span class="nb">len</span><span class="p">(</span><span class="n">s2</span><span class="p">):</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="n">zipped</span> <span class="o">=</span> <span class="n">zip_longest</span><span class="p">(</span><span class="n">s1</span><span class="p">,</span> <span class="n">s2</span><span class="p">)</span>
<span class="k">for</span> <span class="n">sf1</span><span class="p">,</span> <span class="n">sf2</span> <span class="ow">in</span> <span class="n">zipped</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">compare_structfields_ignore_nullable</span><span class="p">(</span><span class="n">sf1</span><span class="p">,</span> <span class="n">sf2</span><span class="p">):</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">def</span><span class="w"> </span><span class="nf">compare_structfields_ignore_nullable</span><span class="p">(</span><span class="n">actualSF</span><span class="p">:</span> <span class="n">StructField</span><span class="p">,</span> <span class="n">expectedSF</span><span class="p">:</span> <span class="n">StructField</span><span class="p">):</span>
<span class="k">if</span> <span class="n">actualSF</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">expectedSF</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">elif</span> <span class="n">actualSF</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">or</span> <span class="n">expectedSF</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">if</span> <span class="n">actualSF</span><span class="o">.</span><span class="n">name</span> <span class="o">!=</span> <span class="n">expectedSF</span><span class="o">.</span><span class="n">name</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">compare_datatypes_ignore_nullable</span><span class="p">(</span><span class="n">actualSF</span><span class="o">.</span><span class="n">dataType</span><span class="p">,</span> <span class="n">expectedSF</span><span class="o">.</span><span class="n">dataType</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="nf">compare_datatypes_ignore_nullable</span><span class="p">(</span><span class="n">dt1</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="n">dt2</span><span class="p">:</span> <span class="n">Any</span><span class="p">):</span>
<span class="c1"># checks datatype equality, using recursion to ignore nullable</span>
<span class="k">if</span> <span class="n">dt1</span><span class="o">.</span><span class="n">typeName</span><span class="p">()</span> <span class="o">==</span> <span class="n">dt2</span><span class="o">.</span><span class="n">typeName</span><span class="p">():</span>
<span class="k">if</span> <span class="n">dt1</span><span class="o">.</span><span class="n">typeName</span><span class="p">()</span> <span class="o">==</span> <span class="s2">&quot;array&quot;</span><span class="p">:</span>
<span class="k">return</span> <span class="n">compare_datatypes_ignore_nullable</span><span class="p">(</span><span class="n">dt1</span><span class="o">.</span><span class="n">elementType</span><span class="p">,</span> <span class="n">dt2</span><span class="o">.</span><span class="n">elementType</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">dt1</span><span class="o">.</span><span class="n">typeName</span><span class="p">()</span> <span class="o">==</span> <span class="s2">&quot;struct&quot;</span><span class="p">:</span>
<span class="k">return</span> <span class="n">compare_schemas_ignore_nullable</span><span class="p">(</span><span class="n">dt1</span><span class="p">,</span> <span class="n">dt2</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="c1"># ignore nullable flag by default</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">compare_schemas_ignore_nullable</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="n">expected</span><span class="p">):</span>
<span class="n">generated_diff</span> <span class="o">=</span> <span class="n">difflib</span><span class="o">.</span><span class="n">ndiff</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">actual</span><span class="p">)</span><span class="o">.</span><span class="n">splitlines</span><span class="p">(),</span> <span class="nb">str</span><span class="p">(</span><span class="n">expected</span><span class="p">)</span><span class="o">.</span><span class="n">splitlines</span><span class="p">())</span>
<span class="n">error_msg</span> <span class="o">=</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">generated_diff</span><span class="p">)</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;DIFFERENT_SCHEMA&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;error_msg&quot;</span><span class="p">:</span> <span class="n">error_msg</span><span class="p">},</span>
<span class="p">)</span></div>
<span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="n">TYPE_CHECKING</span>
<span class="k">if</span> <span class="n">TYPE_CHECKING</span><span class="p">:</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">pandas</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">pyspark.pandas</span>
<div class="viewcode-block" id="assertDataFrameEqual"><a class="viewcode-back" href="../../../reference/api/pyspark.testing.assertDataFrameEqual.html#pyspark.testing.assertDataFrameEqual">[docs]</a><span class="k">def</span><span class="w"> </span><span class="nf">assertDataFrameEqual</span><span class="p">(</span>
<span class="n">actual</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">DataFrame</span><span class="p">,</span> <span class="s2">&quot;pandas.DataFrame&quot;</span><span class="p">,</span> <span class="s2">&quot;pyspark.pandas.DataFrame&quot;</span><span class="p">,</span> <span class="n">List</span><span class="p">[</span><span class="n">Row</span><span class="p">]],</span>
<span class="n">expected</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">DataFrame</span><span class="p">,</span> <span class="s2">&quot;pandas.DataFrame&quot;</span><span class="p">,</span> <span class="s2">&quot;pyspark.pandas.DataFrame&quot;</span><span class="p">,</span> <span class="n">List</span><span class="p">[</span><span class="n">Row</span><span class="p">]],</span>
<span class="n">checkRowOrder</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="n">rtol</span><span class="p">:</span> <span class="nb">float</span> <span class="o">=</span> <span class="mf">1e-5</span><span class="p">,</span>
<span class="n">atol</span><span class="p">:</span> <span class="nb">float</span> <span class="o">=</span> <span class="mf">1e-8</span><span class="p">,</span>
<span class="p">):</span>
<span class="w"> </span><span class="sa">r</span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A util function to assert equality between `actual` and `expected`</span>
<span class="sd"> (DataFrames or lists of Rows), with optional parameters `checkRowOrder`, `rtol`, and `atol`.</span>
<span class="sd"> Supports Spark, Spark Connect, pandas, and pandas-on-Spark DataFrames.</span>
<span class="sd"> For more information about pandas-on-Spark DataFrame equality, see the docs for</span>
<span class="sd"> `assertPandasOnSparkEqual`.</span>
<span class="sd"> .. versionadded:: 3.5.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> actual : DataFrame (Spark, Spark Connect, pandas, or pandas-on-Spark) or list of Rows</span>
<span class="sd"> The DataFrame that is being compared or tested.</span>
<span class="sd"> expected : DataFrame (Spark, Spark Connect, pandas, or pandas-on-Spark) or list of Rows</span>
<span class="sd"> The expected result of the operation, for comparison with the actual result.</span>
<span class="sd"> checkRowOrder : bool, optional</span>
<span class="sd"> A flag indicating whether the order of rows should be considered in the comparison.</span>
<span class="sd"> If set to `False` (default), the row order is not taken into account.</span>
<span class="sd"> If set to `True`, the order of rows is important and will be checked during comparison.</span>
<span class="sd"> (See Notes)</span>
<span class="sd"> rtol : float, optional</span>
<span class="sd"> The relative tolerance, used in asserting approximate equality for float values in actual</span>
<span class="sd"> and expected. Set to 1e-5 by default. (See Notes)</span>
<span class="sd"> atol : float, optional</span>
<span class="sd"> The absolute tolerance, used in asserting approximate equality for float values in actual</span>
<span class="sd"> and expected. Set to 1e-8 by default. (See Notes)</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> When `assertDataFrameEqual` fails, the error message uses the Python `difflib` library to</span>
<span class="sd"> display a diff log of each row that differs in `actual` and `expected`.</span>
<span class="sd"> For `checkRowOrder`, note that PySpark DataFrame ordering is non-deterministic, unless</span>
<span class="sd"> explicitly sorted.</span>
<span class="sd"> Note that schema equality is checked only when `expected` is a DataFrame (not a list of Rows).</span>
<span class="sd"> For DataFrames with float values, assertDataFrame asserts approximate equality.</span>
<span class="sd"> Two float values a and b are approximately equal if the following equation is True:</span>
<span class="sd"> ``absolute(a - b) &lt;= (atol + rtol * absolute(b))``.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; df1 = spark.createDataFrame(data=[(&quot;1&quot;, 1000), (&quot;2&quot;, 3000)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; df2 = spark.createDataFrame(data=[(&quot;1&quot;, 1000), (&quot;2&quot;, 3000)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1, df2) # pass, DataFrames are identical</span>
<span class="sd"> &gt;&gt;&gt; df1 = spark.createDataFrame(data=[(&quot;1&quot;, 0.1), (&quot;2&quot;, 3.23)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; df2 = spark.createDataFrame(data=[(&quot;1&quot;, 0.109), (&quot;2&quot;, 3.23)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1, df2, rtol=1e-1) # pass, DataFrames are approx equal by rtol</span>
<span class="sd"> &gt;&gt;&gt; df1 = spark.createDataFrame(data=[(1, 1000), (2, 3000)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; list_of_rows = [Row(1, 1000), Row(2, 3000)]</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1, list_of_rows) # pass, actual and expected data are equal</span>
<span class="sd"> &gt;&gt;&gt; import pyspark.pandas as ps</span>
<span class="sd"> &gt;&gt;&gt; df1 = ps.DataFrame({&#39;a&#39;: [1, 2, 3], &#39;b&#39;: [4, 5, 6], &#39;c&#39;: [7, 8, 9]})</span>
<span class="sd"> &gt;&gt;&gt; df2 = ps.DataFrame({&#39;a&#39;: [1, 2, 3], &#39;b&#39;: [4, 5, 6], &#39;c&#39;: [7, 8, 9]})</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1, df2) # pass, pandas-on-Spark DataFrames are equal</span>
<span class="sd"> &gt;&gt;&gt; df1 = spark.createDataFrame(</span>
<span class="sd"> ... data=[(&quot;1&quot;, 1000.00), (&quot;2&quot;, 3000.00), (&quot;3&quot;, 2000.00)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; df2 = spark.createDataFrame(</span>
<span class="sd"> ... data=[(&quot;1&quot;, 1001.00), (&quot;2&quot;, 3000.00), (&quot;3&quot;, 2003.00)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1, df2) # doctest: +IGNORE_EXCEPTION_DETAIL</span>
<span class="sd"> Traceback (most recent call last):</span>
<span class="sd"> ...</span>
<span class="sd"> PySparkAssertionError: [DIFFERENT_ROWS] Results do not match: ( 66.66667 % )</span>
<span class="sd"> *** actual ***</span>
<span class="sd"> ! Row(id=&#39;1&#39;, amount=1000.0)</span>
<span class="sd"> Row(id=&#39;2&#39;, amount=3000.0)</span>
<span class="sd"> ! Row(id=&#39;3&#39;, amount=2000.0)</span>
<span class="sd"> *** expected ***</span>
<span class="sd"> ! Row(id=&#39;1&#39;, amount=1001.0)</span>
<span class="sd"> Row(id=&#39;2&#39;, amount=3000.0)</span>
<span class="sd"> ! Row(id=&#39;3&#39;, amount=2003.0)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">actual</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">expected</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">elif</span> <span class="n">actual</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_TYPE_DF_EQUALITY_ARG&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;expected_type&quot;</span><span class="p">:</span> <span class="s2">&quot;Union[DataFrame, ps.DataFrame, List[Row]]&quot;</span><span class="p">,</span>
<span class="s2">&quot;arg_name&quot;</span><span class="p">:</span> <span class="s2">&quot;actual&quot;</span><span class="p">,</span>
<span class="s2">&quot;actual_type&quot;</span><span class="p">:</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="n">expected</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_TYPE_DF_EQUALITY_ARG&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;expected_type&quot;</span><span class="p">:</span> <span class="s2">&quot;Union[DataFrame, ps.DataFrame, List[Row]]&quot;</span><span class="p">,</span>
<span class="s2">&quot;arg_name&quot;</span><span class="p">:</span> <span class="s2">&quot;expected&quot;</span><span class="p">,</span>
<span class="s2">&quot;actual_type&quot;</span><span class="p">:</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="n">has_pandas</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># If pandas dependencies are available, allow pandas or pandas-on-Spark DataFrame</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">pyspark.pandas</span><span class="w"> </span><span class="k">as</span><span class="w"> </span><span class="nn">ps</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">pandas</span><span class="w"> </span><span class="k">as</span><span class="w"> </span><span class="nn">pd</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.testing.pandasutils</span><span class="w"> </span><span class="kn">import</span> <span class="n">PandasOnSparkTestUtils</span>
<span class="n">has_pandas</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="c1"># no pandas, so we won&#39;t call pandasutils functions</span>
<span class="k">pass</span>
<span class="k">if</span> <span class="n">has_pandas</span><span class="p">:</span>
<span class="k">if</span> <span class="p">(</span>
<span class="nb">isinstance</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span>
<span class="ow">or</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expected</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span>
<span class="ow">or</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="n">ps</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span>
<span class="ow">or</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expected</span><span class="p">,</span> <span class="n">ps</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span>
<span class="p">):</span>
<span class="c1"># handle pandas DataFrames</span>
<span class="c1"># assert approximate equality for float data</span>
<span class="k">return</span> <span class="n">PandasOnSparkTestUtils</span><span class="p">()</span><span class="o">.</span><span class="n">assert_eq</span><span class="p">(</span>
<span class="n">actual</span><span class="p">,</span> <span class="n">expected</span><span class="p">,</span> <span class="n">almost</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">rtol</span><span class="o">=</span><span class="n">rtol</span><span class="p">,</span> <span class="n">atol</span><span class="o">=</span><span class="n">atol</span><span class="p">,</span> <span class="n">check_row_order</span><span class="o">=</span><span class="n">checkRowOrder</span>
<span class="p">)</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.sql.utils</span><span class="w"> </span><span class="kn">import</span> <span class="n">get_dataframe_class</span>
<span class="c1"># if is_remote(), allow Connect DataFrame</span>
<span class="n">SparkDataFrame</span> <span class="o">=</span> <span class="n">get_dataframe_class</span><span class="p">()</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="p">(</span><span class="n">DataFrame</span><span class="p">,</span> <span class="n">SparkDataFrame</span><span class="p">,</span> <span class="nb">list</span><span class="p">)):</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_TYPE_DF_EQUALITY_ARG&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;expected_type&quot;</span><span class="p">:</span> <span class="s2">&quot;Union[DataFrame, ps.DataFrame, List[Row]]&quot;</span><span class="p">,</span>
<span class="s2">&quot;arg_name&quot;</span><span class="p">:</span> <span class="s2">&quot;actual&quot;</span><span class="p">,</span>
<span class="s2">&quot;actual_type&quot;</span><span class="p">:</span> <span class="nb">type</span><span class="p">(</span><span class="n">actual</span><span class="p">),</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expected</span><span class="p">,</span> <span class="p">(</span><span class="n">DataFrame</span><span class="p">,</span> <span class="n">SparkDataFrame</span><span class="p">,</span> <span class="nb">list</span><span class="p">)):</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_TYPE_DF_EQUALITY_ARG&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;expected_type&quot;</span><span class="p">:</span> <span class="s2">&quot;Union[DataFrame, ps.DataFrame, List[Row]]&quot;</span><span class="p">,</span>
<span class="s2">&quot;arg_name&quot;</span><span class="p">:</span> <span class="s2">&quot;expected&quot;</span><span class="p">,</span>
<span class="s2">&quot;actual_type&quot;</span><span class="p">:</span> <span class="nb">type</span><span class="p">(</span><span class="n">expected</span><span class="p">),</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="nf">compare_rows</span><span class="p">(</span><span class="n">r1</span><span class="p">:</span> <span class="n">Row</span><span class="p">,</span> <span class="n">r2</span><span class="p">:</span> <span class="n">Row</span><span class="p">):</span>
<span class="k">def</span><span class="w"> </span><span class="nf">compare_vals</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="n">val2</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="nb">list</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val2</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="n">val1</span><span class="p">)</span> <span class="o">==</span> <span class="nb">len</span><span class="p">(</span><span class="n">val2</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">all</span><span class="p">(</span>
<span class="n">compare_vals</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">)</span> <span class="k">for</span> <span class="n">x</span><span class="p">,</span> <span class="n">y</span> <span class="ow">in</span> <span class="nb">zip</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="n">val2</span><span class="p">)</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="n">Row</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val2</span><span class="p">,</span> <span class="n">Row</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">all</span><span class="p">(</span><span class="n">compare_vals</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">)</span> <span class="k">for</span> <span class="n">x</span><span class="p">,</span> <span class="n">y</span> <span class="ow">in</span> <span class="nb">zip</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="n">val2</span><span class="p">))</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="nb">dict</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val2</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span>
<span class="nb">len</span><span class="p">(</span><span class="n">val1</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span> <span class="o">==</span> <span class="nb">len</span><span class="p">(</span><span class="n">val2</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span>
<span class="ow">and</span> <span class="n">val1</span><span class="o">.</span><span class="n">keys</span><span class="p">()</span> <span class="o">==</span> <span class="n">val2</span><span class="o">.</span><span class="n">keys</span><span class="p">()</span>
<span class="ow">and</span> <span class="nb">all</span><span class="p">(</span><span class="n">compare_vals</span><span class="p">(</span><span class="n">val1</span><span class="p">[</span><span class="n">k</span><span class="p">],</span> <span class="n">val2</span><span class="p">[</span><span class="n">k</span><span class="p">])</span> <span class="k">for</span> <span class="n">k</span> <span class="ow">in</span> <span class="n">val1</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="nb">float</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val2</span><span class="p">,</span> <span class="nb">float</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">abs</span><span class="p">(</span><span class="n">val1</span> <span class="o">-</span> <span class="n">val2</span><span class="p">)</span> <span class="o">&gt;</span> <span class="p">(</span><span class="n">atol</span> <span class="o">+</span> <span class="n">rtol</span> <span class="o">*</span> <span class="nb">abs</span><span class="p">(</span><span class="n">val2</span><span class="p">)):</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">if</span> <span class="n">val1</span> <span class="o">!=</span> <span class="n">val2</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">if</span> <span class="n">r1</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">r2</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">elif</span> <span class="n">r1</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">or</span> <span class="n">r2</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">return</span> <span class="n">compare_vals</span><span class="p">(</span><span class="n">r1</span><span class="p">,</span> <span class="n">r2</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="nf">assert_rows_equal</span><span class="p">(</span><span class="n">rows1</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="n">Row</span><span class="p">],</span> <span class="n">rows2</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="n">Row</span><span class="p">]):</span>
<span class="n">zipped</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">zip_longest</span><span class="p">(</span><span class="n">rows1</span><span class="p">,</span> <span class="n">rows2</span><span class="p">))</span>
<span class="n">diff_rows_cnt</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">diff_rows</span> <span class="o">=</span> <span class="kc">False</span>
<span class="n">rows_str1</span> <span class="o">=</span> <span class="s2">&quot;&quot;</span>
<span class="n">rows_str2</span> <span class="o">=</span> <span class="s2">&quot;&quot;</span>
<span class="c1"># count different rows</span>
<span class="k">for</span> <span class="n">r1</span><span class="p">,</span> <span class="n">r2</span> <span class="ow">in</span> <span class="n">zipped</span><span class="p">:</span>
<span class="n">rows_str1</span> <span class="o">+=</span> <span class="nb">str</span><span class="p">(</span><span class="n">r1</span><span class="p">)</span> <span class="o">+</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span>
<span class="n">rows_str2</span> <span class="o">+=</span> <span class="nb">str</span><span class="p">(</span><span class="n">r2</span><span class="p">)</span> <span class="o">+</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">compare_rows</span><span class="p">(</span><span class="n">r1</span><span class="p">,</span> <span class="n">r2</span><span class="p">):</span>
<span class="n">diff_rows_cnt</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="n">diff_rows</span> <span class="o">=</span> <span class="kc">True</span>
<span class="n">generated_diff</span> <span class="o">=</span> <span class="n">_context_diff</span><span class="p">(</span>
<span class="n">actual</span><span class="o">=</span><span class="n">rows_str1</span><span class="o">.</span><span class="n">splitlines</span><span class="p">(),</span> <span class="n">expected</span><span class="o">=</span><span class="n">rows_str2</span><span class="o">.</span><span class="n">splitlines</span><span class="p">(),</span> <span class="n">n</span><span class="o">=</span><span class="nb">len</span><span class="p">(</span><span class="n">zipped</span><span class="p">)</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">diff_rows</span><span class="p">:</span>
<span class="n">error_msg</span> <span class="o">=</span> <span class="s2">&quot;Results do not match: &quot;</span>
<span class="n">percent_diff</span> <span class="o">=</span> <span class="p">(</span><span class="n">diff_rows_cnt</span> <span class="o">/</span> <span class="nb">len</span><span class="p">(</span><span class="n">zipped</span><span class="p">))</span> <span class="o">*</span> <span class="mi">100</span>
<span class="n">error_msg</span> <span class="o">+=</span> <span class="s2">&quot;( </span><span class="si">%.5f</span><span class="s2"> </span><span class="si">%%</span><span class="s2"> )&quot;</span> <span class="o">%</span> <span class="n">percent_diff</span>
<span class="n">error_msg</span> <span class="o">+=</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span> <span class="o">+</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">generated_diff</span><span class="p">)</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;DIFFERENT_ROWS&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;error_msg&quot;</span><span class="p">:</span> <span class="n">error_msg</span><span class="p">},</span>
<span class="p">)</span>
<span class="c1"># convert actual and expected to list</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="nb">list</span><span class="p">)</span> <span class="ow">and</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expected</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="c1"># only compare schema if expected is not a List</span>
<span class="n">assertSchemaEqual</span><span class="p">(</span><span class="n">actual</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="n">expected</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="k">if</span> <span class="n">actual</span><span class="o">.</span><span class="n">isStreaming</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;UNSUPPORTED_OPERATION&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;operation&quot;</span><span class="p">:</span> <span class="s2">&quot;assertDataFrameEqual on streaming DataFrame&quot;</span><span class="p">},</span>
<span class="p">)</span>
<span class="n">actual_list</span> <span class="o">=</span> <span class="n">actual</span><span class="o">.</span><span class="n">collect</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">actual_list</span> <span class="o">=</span> <span class="n">actual</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expected</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="k">if</span> <span class="n">expected</span><span class="o">.</span><span class="n">isStreaming</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;UNSUPPORTED_OPERATION&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;operation&quot;</span><span class="p">:</span> <span class="s2">&quot;assertDataFrameEqual on streaming DataFrame&quot;</span><span class="p">},</span>
<span class="p">)</span>
<span class="n">expected_list</span> <span class="o">=</span> <span class="n">expected</span><span class="o">.</span><span class="n">collect</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">expected_list</span> <span class="o">=</span> <span class="n">expected</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">checkRowOrder</span><span class="p">:</span>
<span class="c1"># rename duplicate columns for sorting</span>
<span class="n">actual_list</span> <span class="o">=</span> <span class="nb">sorted</span><span class="p">(</span><span class="n">actual_list</span><span class="p">,</span> <span class="n">key</span><span class="o">=</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">x</span><span class="p">))</span>
<span class="n">expected_list</span> <span class="o">=</span> <span class="nb">sorted</span><span class="p">(</span><span class="n">expected_list</span><span class="p">,</span> <span class="n">key</span><span class="o">=</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">x</span><span class="p">))</span>
<span class="n">assert_rows_equal</span><span class="p">(</span><span class="n">actual_list</span><span class="p">,</span> <span class="n">expected_list</span><span class="p">)</span></div>
<span class="k">def</span><span class="w"> </span><span class="nf">_test</span><span class="p">()</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">doctest</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.sql</span><span class="w"> </span><span class="kn">import</span> <span class="n">SparkSession</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">pyspark.testing.utils</span>
<span class="n">globs</span> <span class="o">=</span> <span class="n">pyspark</span><span class="o">.</span><span class="n">testing</span><span class="o">.</span><span class="n">utils</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span><span class="o">.</span><span class="n">builder</span><span class="o">.</span><span class="n">master</span><span class="p">(</span><span class="s2">&quot;local[4]&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">appName</span><span class="p">(</span><span class="s2">&quot;testing.utils tests&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;spark&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">spark</span>
<span class="p">(</span><span class="n">failure_count</span><span class="p">,</span> <span class="n">test_count</span><span class="p">)</span> <span class="o">=</span> <span class="n">doctest</span><span class="o">.</span><span class="n">testmod</span><span class="p">(</span>
<span class="n">pyspark</span><span class="o">.</span><span class="n">testing</span><span class="o">.</span><span class="n">utils</span><span class="p">,</span>
<span class="n">globs</span><span class="o">=</span><span class="n">globs</span><span class="p">,</span>
<span class="n">optionflags</span><span class="o">=</span><span class="n">doctest</span><span class="o">.</span><span class="n">ELLIPSIS</span> <span class="o">|</span> <span class="n">doctest</span><span class="o">.</span><span class="n">NORMALIZE_WHITESPACE</span><span class="p">,</span>
<span class="p">)</span>
<span class="n">spark</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">if</span> <span class="n">failure_count</span><span class="p">:</span>
<span class="n">sys</span><span class="o">.</span><span class="n">exit</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">)</span>
<span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">&quot;__main__&quot;</span><span class="p">:</span>
<span class="n">_test</span><span class="p">()</span>
</pre></div>
</div>
<!-- Previous / next buttons -->
<div class='prev-next-area'>
</div>
</main>
</div>
</div>
<script src="../../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf"></script>
<footer class="footer mt-5 mt-md-0">
<div class="container">
<div class="footer-item">
<p class="copyright">
&copy; Copyright .<br>
</p>
</div>
<div class="footer-item">
<p class="sphinx-version">
Created using <a href="http://sphinx-doc.org/">Sphinx</a> 3.0.4.<br>
</p>
</div>
</div>
</footer>
</body>
</html>