blob: 3c86ec152a541d2ed14add1e8f774967b9682e16 [file] [log] [blame]
<!DOCTYPE html>
<html >
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>pyspark.testing.utils &#8212; PySpark 4.0.0-preview1 documentation</title>
<script data-cfasync="false">
document.documentElement.dataset.mode = localStorage.getItem("mode") || "";
document.documentElement.dataset.theme = localStorage.getItem("theme") || "light";
</script>
<!-- Loaded before other Sphinx assets -->
<link href="../../../_static/styles/theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../../_static/styles/bootstrap.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../../_static/styles/pydata-sphinx-theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../../_static/vendor/fontawesome/6.1.2/css/all.min.css?digest=e353d410970836974a52" rel="stylesheet" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../../_static/vendor/fontawesome/6.1.2/webfonts/fa-solid-900.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../../_static/vendor/fontawesome/6.1.2/webfonts/fa-brands-400.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../../_static/vendor/fontawesome/6.1.2/webfonts/fa-regular-400.woff2" />
<link rel="stylesheet" type="text/css" href="../../../_static/pygments.css" />
<link rel="stylesheet" type="text/css" href="../../../_static/copybutton.css" />
<link rel="stylesheet" type="text/css" href="../../../_static/css/pyspark.css" />
<!-- Pre-loaded scripts that we'll load fully later -->
<link rel="preload" as="script" href="../../../_static/scripts/bootstrap.js?digest=e353d410970836974a52" />
<link rel="preload" as="script" href="../../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52" />
<script data-url_root="../../../" id="documentation_options" src="../../../_static/documentation_options.js"></script>
<script src="../../../_static/jquery.js"></script>
<script src="../../../_static/underscore.js"></script>
<script src="../../../_static/doctools.js"></script>
<script src="../../../_static/clipboard.min.js"></script>
<script src="../../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script>DOCUMENTATION_OPTIONS.pagename = '_modules/pyspark/testing/utils';</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/_modules/pyspark/testing/utils.html" />
<link rel="search" title="Search" href="../../../search.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="None">
<!-- Matomo -->
<script type="text/javascript">
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body data-bs-spy="scroll" data-bs-target=".bd-toc-nav" data-offset="180" data-bs-root-margin="0px 0px -60%" data-default-mode="">
<a class="skip-link" href="#main-content">Skip to main content</a>
<input type="checkbox"
class="sidebar-toggle"
name="__primary"
id="__primary"/>
<label class="overlay overlay-primary" for="__primary"></label>
<input type="checkbox"
class="sidebar-toggle"
name="__secondary"
id="__secondary"/>
<label class="overlay overlay-secondary" for="__secondary"></label>
<div class="search-button__wrapper">
<div class="search-button__overlay"></div>
<div class="search-button__search-container">
<form class="bd-search d-flex align-items-center"
action="../../../search.html"
method="get">
<i class="fa-solid fa-magnifying-glass"></i>
<input type="search"
class="form-control"
name="q"
id="search-input"
placeholder="Search the docs ..."
aria-label="Search the docs ..."
autocomplete="off"
autocorrect="off"
autocapitalize="off"
spellcheck="false"/>
<span class="search-button__kbd-shortcut"><kbd class="kbd-shortcut__modifier">Ctrl</kbd>+<kbd>K</kbd></span>
</form></div>
</div>
<nav class="bd-header navbar navbar-expand-lg bd-navbar">
<div class="bd-header__inner bd-page-width">
<label class="sidebar-toggle primary-toggle" for="__primary">
<span class="fa-solid fa-bars"></span>
</label>
<div class="navbar-header-items__start">
<div class="navbar-item">
<a class="navbar-brand logo" href="../../../index.html">
<img src="../../../_static/spark-logo-light.png" class="logo__image only-light" alt="Logo image"/>
<script>document.write(`<img src="../../../_static/spark-logo-dark.png" class="logo__image only-dark" alt="Logo image"/>`);</script>
</a></div>
</div>
<div class="col-lg-9 navbar-header-items">
<div class="me-auto navbar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../user_guide/index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="navbar-header-items__end">
<div class="navbar-item navbar-persistent--container">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview1
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/testing/utils.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="navbar-persistent--mobile">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
</div>
</nav>
<div class="bd-container">
<div class="bd-container__inner bd-page-width">
<div class="bd-sidebar-primary bd-sidebar hide-on-wide">
<div class="sidebar-header-items sidebar-primary__section">
<div class="sidebar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../user_guide/index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="sidebar-header-items__end">
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview1
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/testing/utils.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="sidebar-primary-items__end sidebar-primary__section">
</div>
<div id="rtd-footer-container"></div>
</div>
<main id="main-content" class="bd-main">
<div class="bd-content">
<div class="bd-article-container">
<div class="bd-header-article">
<div class="header-article-items header-article__inner">
<div class="header-article-items__start">
<div class="header-article-item">
<nav aria-label="Breadcrumbs">
<ul class="bd-breadcrumbs" role="navigation" aria-label="Breadcrumb">
<li class="breadcrumb-item breadcrumb-home">
<a href="../../../index.html" class="nav-link" aria-label="Home">
<i class="fa-solid fa-home"></i>
</a>
</li>
<li class="breadcrumb-item"><a href="../../index.html" class="nav-link">Module code</a></li>
<li class="breadcrumb-item active" aria-current="page">pyspark.testing.utils</li>
</ul>
</nav>
</div>
</div>
</div>
</div>
<div id="searchbox"></div>
<article class="bd-article" role="main">
<h1>Source code for pyspark.testing.utils</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="kn">import</span> <span class="nn">glob</span>
<span class="kn">import</span> <span class="nn">os</span>
<span class="kn">import</span> <span class="nn">struct</span>
<span class="kn">import</span> <span class="nn">sys</span>
<span class="kn">import</span> <span class="nn">unittest</span>
<span class="kn">import</span> <span class="nn">difflib</span>
<span class="kn">import</span> <span class="nn">functools</span>
<span class="kn">import</span> <span class="nn">math</span>
<span class="kn">from</span> <span class="nn">decimal</span> <span class="kn">import</span> <span class="n">Decimal</span>
<span class="kn">from</span> <span class="nn">time</span> <span class="kn">import</span> <span class="n">time</span><span class="p">,</span> <span class="n">sleep</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="p">(</span>
<span class="n">Any</span><span class="p">,</span>
<span class="n">Optional</span><span class="p">,</span>
<span class="n">Union</span><span class="p">,</span>
<span class="n">Dict</span><span class="p">,</span>
<span class="n">List</span><span class="p">,</span>
<span class="n">Callable</span><span class="p">,</span>
<span class="p">)</span>
<span class="kn">from</span> <span class="nn">itertools</span> <span class="kn">import</span> <span class="n">zip_longest</span>
<span class="n">have_scipy</span> <span class="o">=</span> <span class="kc">False</span>
<span class="n">have_numpy</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">scipy</span> <span class="c1"># noqa: F401</span>
<span class="n">have_scipy</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="c1"># No SciPy, but that&#39;s okay, we&#39;ll skip those tests</span>
<span class="k">pass</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">numpy</span> <span class="k">as</span> <span class="nn">np</span> <span class="c1"># noqa: F401</span>
<span class="n">have_numpy</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="c1"># No NumPy, but that&#39;s okay, we&#39;ll skip those tests</span>
<span class="k">pass</span>
<span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkConf</span>
<span class="kn">from</span> <span class="nn">pyspark.errors</span> <span class="kn">import</span> <span class="n">PySparkAssertionError</span><span class="p">,</span> <span class="n">PySparkException</span>
<span class="kn">from</span> <span class="nn">pyspark.errors.exceptions.captured</span> <span class="kn">import</span> <span class="n">CapturedException</span>
<span class="kn">from</span> <span class="nn">pyspark.errors.exceptions.base</span> <span class="kn">import</span> <span class="n">QueryContextType</span>
<span class="kn">from</span> <span class="nn">pyspark.find_spark_home</span> <span class="kn">import</span> <span class="n">_find_spark_home</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.dataframe</span> <span class="kn">import</span> <span class="n">DataFrame</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">Row</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="n">StructType</span><span class="p">,</span> <span class="n">StructField</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">col</span><span class="p">,</span> <span class="n">when</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s2">&quot;assertDataFrameEqual&quot;</span><span class="p">,</span> <span class="s2">&quot;assertSchemaEqual&quot;</span><span class="p">]</span>
<span class="n">SPARK_HOME</span> <span class="o">=</span> <span class="n">_find_spark_home</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">read_int</span><span class="p">(</span><span class="n">b</span><span class="p">):</span>
<span class="k">return</span> <span class="n">struct</span><span class="o">.</span><span class="n">unpack</span><span class="p">(</span><span class="s2">&quot;!i&quot;</span><span class="p">,</span> <span class="n">b</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">write_int</span><span class="p">(</span><span class="n">i</span><span class="p">):</span>
<span class="k">return</span> <span class="n">struct</span><span class="o">.</span><span class="n">pack</span><span class="p">(</span><span class="s2">&quot;!i&quot;</span><span class="p">,</span> <span class="n">i</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">eventually</span><span class="p">(</span>
<span class="n">timeout</span><span class="o">=</span><span class="mf">30.0</span><span class="p">,</span>
<span class="n">catch_assertions</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Wait a given amount of time for a condition to pass, else fail with an error.</span>
<span class="sd"> This is a helper utility for PySpark tests.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> condition : function</span>
<span class="sd"> Function that checks for termination conditions. condition() can return:</span>
<span class="sd"> - True or None: Conditions met. Return without error.</span>
<span class="sd"> - other value: Conditions not met yet. Continue. Upon timeout,</span>
<span class="sd"> include last such value in error message.</span>
<span class="sd"> Note that this method may be called at any time during</span>
<span class="sd"> streaming execution (e.g., even before any results</span>
<span class="sd"> have been created).</span>
<span class="sd"> timeout : int</span>
<span class="sd"> Number of seconds to wait. Default 30 seconds.</span>
<span class="sd"> catch_assertions : bool</span>
<span class="sd"> If False (default), do not catch AssertionErrors.</span>
<span class="sd"> If True, catch AssertionErrors; continue, but save</span>
<span class="sd"> error to throw upon timeout.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">assert</span> <span class="n">timeout</span> <span class="o">&gt;</span> <span class="mi">0</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">catch_assertions</span><span class="p">,</span> <span class="nb">bool</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">decorator</span><span class="p">(</span><span class="n">condition</span><span class="p">:</span> <span class="n">Callable</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Callable</span><span class="p">:</span>
<span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">condition</span><span class="p">,</span> <span class="n">Callable</span><span class="p">)</span>
<span class="nd">@functools</span><span class="o">.</span><span class="n">wraps</span><span class="p">(</span><span class="n">condition</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">wrapper</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="n">Any</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Any</span><span class="p">:</span>
<span class="n">start_time</span> <span class="o">=</span> <span class="n">time</span><span class="p">()</span>
<span class="n">lastValue</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">numTries</span> <span class="o">=</span> <span class="mi">0</span>
<span class="k">while</span> <span class="n">time</span><span class="p">()</span> <span class="o">-</span> <span class="n">start_time</span> <span class="o">&lt;</span> <span class="n">timeout</span><span class="p">:</span>
<span class="n">numTries</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="k">if</span> <span class="n">catch_assertions</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">lastValue</span> <span class="o">=</span> <span class="n">condition</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">AssertionError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">lastValue</span> <span class="o">=</span> <span class="n">e</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">lastValue</span> <span class="o">=</span> <span class="n">condition</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">if</span> <span class="n">lastValue</span> <span class="ow">is</span> <span class="kc">True</span> <span class="ow">or</span> <span class="n">lastValue</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span>
<span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;</span><span class="se">\n</span><span class="s2">Attempt #</span><span class="si">{</span><span class="n">numTries</span><span class="si">}</span><span class="s2"> failed!</span><span class="se">\n</span><span class="si">{</span><span class="n">lastValue</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>
<span class="n">sleep</span><span class="p">(</span><span class="mf">0.01</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">lastValue</span><span class="p">,</span> <span class="ne">AssertionError</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">lastValue</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">AssertionError</span><span class="p">(</span>
<span class="s2">&quot;Test failed due to timeout after </span><span class="si">%g</span><span class="s2"> sec, with last condition returning: </span><span class="si">%s</span><span class="s2">&quot;</span>
<span class="o">%</span> <span class="p">(</span><span class="n">timeout</span><span class="p">,</span> <span class="n">lastValue</span><span class="p">)</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">wrapper</span>
<span class="k">return</span> <span class="n">decorator</span>
<span class="k">class</span> <span class="nc">QuietTest</span><span class="p">:</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sc</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log4j</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">log4j</span>
<span class="k">def</span> <span class="fm">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">old_level</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">log4j</span><span class="o">.</span><span class="n">LogManager</span><span class="o">.</span><span class="n">getRootLogger</span><span class="p">()</span><span class="o">.</span><span class="n">getLevel</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log4j</span><span class="o">.</span><span class="n">LogManager</span><span class="o">.</span><span class="n">getRootLogger</span><span class="p">()</span><span class="o">.</span><span class="n">setLevel</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">log4j</span><span class="o">.</span><span class="n">Level</span><span class="o">.</span><span class="n">FATAL</span><span class="p">)</span>
<span class="k">def</span> <span class="fm">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exc_type</span><span class="p">,</span> <span class="n">exc_val</span><span class="p">,</span> <span class="n">exc_tb</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log4j</span><span class="o">.</span><span class="n">LogManager</span><span class="o">.</span><span class="n">getRootLogger</span><span class="p">()</span><span class="o">.</span><span class="n">setLevel</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">old_level</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">PySparkTestCase</span><span class="p">(</span><span class="n">unittest</span><span class="o">.</span><span class="n">TestCase</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">setUp</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkContext</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_old_sys_path</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">sys</span><span class="o">.</span><span class="n">path</span><span class="p">)</span>
<span class="n">class_name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="s2">&quot;local[4]&quot;</span><span class="p">,</span> <span class="n">class_name</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">tearDown</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sc</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="n">sys</span><span class="o">.</span><span class="n">path</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_old_sys_path</span>
<span class="k">class</span> <span class="nc">ReusedPySparkTestCase</span><span class="p">(</span><span class="n">unittest</span><span class="o">.</span><span class="n">TestCase</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">conf</span><span class="p">(</span><span class="bp">cls</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Override this in subclasses to supply a more specific conf</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">SparkConf</span><span class="p">()</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">setUpClass</span><span class="p">(</span><span class="bp">cls</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkContext</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="s2">&quot;local[4]&quot;</span><span class="p">,</span> <span class="bp">cls</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">conf</span><span class="o">=</span><span class="bp">cls</span><span class="o">.</span><span class="n">conf</span><span class="p">())</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">tearDownClass</span><span class="p">(</span><span class="bp">cls</span><span class="p">):</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">sc</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">test_assert_vanilla_mode</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">is_remote</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertFalse</span><span class="p">(</span><span class="n">is_remote</span><span class="p">())</span>
<span class="k">def</span> <span class="nf">quiet</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">pyspark.testing.utils</span> <span class="kn">import</span> <span class="n">QuietTest</span>
<span class="k">return</span> <span class="n">QuietTest</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sc</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">ByteArrayOutput</span><span class="p">:</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">=</span> <span class="nb">bytearray</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">write</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">b</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">+=</span> <span class="n">b</span>
<span class="k">def</span> <span class="nf">close</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">search_jar</span><span class="p">(</span><span class="n">project_relative_path</span><span class="p">,</span> <span class="n">sbt_jar_name_prefix</span><span class="p">,</span> <span class="n">mvn_jar_name_prefix</span><span class="p">):</span>
<span class="c1"># Note that &#39;sbt_jar_name_prefix&#39; and &#39;mvn_jar_name_prefix&#39; are used since the prefix can</span>
<span class="c1"># vary for SBT or Maven specifically. See also SPARK-26856</span>
<span class="n">project_full_path</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">SPARK_HOME</span><span class="p">,</span> <span class="n">project_relative_path</span><span class="p">)</span>
<span class="c1"># We should ignore the following jars</span>
<span class="n">ignored_jar_suffixes</span> <span class="o">=</span> <span class="p">(</span><span class="s2">&quot;javadoc.jar&quot;</span><span class="p">,</span> <span class="s2">&quot;sources.jar&quot;</span><span class="p">,</span> <span class="s2">&quot;test-sources.jar&quot;</span><span class="p">,</span> <span class="s2">&quot;tests.jar&quot;</span><span class="p">)</span>
<span class="c1"># Search jar in the project dir using the jar name_prefix for both sbt build and maven</span>
<span class="c1"># build because the artifact jars are in different directories.</span>
<span class="n">sbt_build</span> <span class="o">=</span> <span class="n">glob</span><span class="o">.</span><span class="n">glob</span><span class="p">(</span>
<span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">project_full_path</span><span class="p">,</span> <span class="s2">&quot;target/scala-*/</span><span class="si">%s</span><span class="s2">*.jar&quot;</span> <span class="o">%</span> <span class="n">sbt_jar_name_prefix</span><span class="p">)</span>
<span class="p">)</span>
<span class="n">maven_build</span> <span class="o">=</span> <span class="n">glob</span><span class="o">.</span><span class="n">glob</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">project_full_path</span><span class="p">,</span> <span class="s2">&quot;target/</span><span class="si">%s</span><span class="s2">*.jar&quot;</span> <span class="o">%</span> <span class="n">mvn_jar_name_prefix</span><span class="p">))</span>
<span class="n">jar_paths</span> <span class="o">=</span> <span class="n">sbt_build</span> <span class="o">+</span> <span class="n">maven_build</span>
<span class="n">jars</span> <span class="o">=</span> <span class="p">[</span><span class="n">jar</span> <span class="k">for</span> <span class="n">jar</span> <span class="ow">in</span> <span class="n">jar_paths</span> <span class="k">if</span> <span class="ow">not</span> <span class="n">jar</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="n">ignored_jar_suffixes</span><span class="p">)]</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">jars</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">None</span>
<span class="k">elif</span> <span class="nb">len</span><span class="p">(</span><span class="n">jars</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">1</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span><span class="s2">&quot;Found multiple JARs: </span><span class="si">%s</span><span class="s2">; please remove all but one&quot;</span> <span class="o">%</span> <span class="p">(</span><span class="s2">&quot;, &quot;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">jars</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">jars</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">_terminal_color_support</span><span class="p">():</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># determine if environment supports color</span>
<span class="n">script</span> <span class="o">=</span> <span class="s2">&quot;$(test $(tput colors)) &amp;&amp; $(test $(tput colors) -ge 8) &amp;&amp; echo true || echo false&quot;</span>
<span class="k">return</span> <span class="n">os</span><span class="o">.</span><span class="n">popen</span><span class="p">(</span><span class="n">script</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">()</span>
<span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">def</span> <span class="nf">_context_diff</span><span class="p">(</span><span class="n">actual</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">],</span> <span class="n">expected</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">],</span> <span class="n">n</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">3</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Modified from difflib context_diff API,</span>
<span class="sd"> see original code here: https://github.com/python/cpython/blob/main/Lib/difflib.py#L1180</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">red</span><span class="p">(</span><span class="n">s</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">str</span><span class="p">:</span>
<span class="n">red_color</span> <span class="o">=</span> <span class="s2">&quot;</span><span class="se">\033</span><span class="s2">[31m&quot;</span>
<span class="n">no_color</span> <span class="o">=</span> <span class="s2">&quot;</span><span class="se">\033</span><span class="s2">[0m&quot;</span>
<span class="k">return</span> <span class="n">red_color</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">s</span><span class="p">)</span> <span class="o">+</span> <span class="n">no_color</span>
<span class="n">prefix</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">insert</span><span class="o">=</span><span class="s2">&quot;+ &quot;</span><span class="p">,</span> <span class="n">delete</span><span class="o">=</span><span class="s2">&quot;- &quot;</span><span class="p">,</span> <span class="n">replace</span><span class="o">=</span><span class="s2">&quot;! &quot;</span><span class="p">,</span> <span class="n">equal</span><span class="o">=</span><span class="s2">&quot; &quot;</span><span class="p">)</span>
<span class="k">for</span> <span class="n">group</span> <span class="ow">in</span> <span class="n">difflib</span><span class="o">.</span><span class="n">SequenceMatcher</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">actual</span><span class="p">,</span> <span class="n">expected</span><span class="p">)</span><span class="o">.</span><span class="n">get_grouped_opcodes</span><span class="p">(</span><span class="n">n</span><span class="p">):</span>
<span class="k">yield</span> <span class="s2">&quot;*** actual ***&quot;</span>
<span class="k">if</span> <span class="nb">any</span><span class="p">(</span><span class="n">tag</span> <span class="ow">in</span> <span class="p">{</span><span class="s2">&quot;replace&quot;</span><span class="p">,</span> <span class="s2">&quot;delete&quot;</span><span class="p">}</span> <span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">group</span><span class="p">):</span>
<span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">i1</span><span class="p">,</span> <span class="n">i2</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">group</span><span class="p">:</span>
<span class="k">for</span> <span class="n">line</span> <span class="ow">in</span> <span class="n">actual</span><span class="p">[</span><span class="n">i1</span><span class="p">:</span><span class="n">i2</span><span class="p">]:</span>
<span class="k">if</span> <span class="n">tag</span> <span class="o">!=</span> <span class="s2">&quot;equal&quot;</span> <span class="ow">and</span> <span class="n">_terminal_color_support</span><span class="p">():</span>
<span class="k">yield</span> <span class="n">red</span><span class="p">(</span><span class="n">prefix</span><span class="p">[</span><span class="n">tag</span><span class="p">]</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">line</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">prefix</span><span class="p">[</span><span class="n">tag</span><span class="p">]</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">line</span><span class="p">)</span>
<span class="k">yield</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span>
<span class="k">yield</span> <span class="s2">&quot;*** expected ***&quot;</span>
<span class="k">if</span> <span class="nb">any</span><span class="p">(</span><span class="n">tag</span> <span class="ow">in</span> <span class="p">{</span><span class="s2">&quot;replace&quot;</span><span class="p">,</span> <span class="s2">&quot;insert&quot;</span><span class="p">}</span> <span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">group</span><span class="p">):</span>
<span class="k">for</span> <span class="n">tag</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">j1</span><span class="p">,</span> <span class="n">j2</span> <span class="ow">in</span> <span class="n">group</span><span class="p">:</span>
<span class="k">for</span> <span class="n">line</span> <span class="ow">in</span> <span class="n">expected</span><span class="p">[</span><span class="n">j1</span><span class="p">:</span><span class="n">j2</span><span class="p">]:</span>
<span class="k">if</span> <span class="n">tag</span> <span class="o">!=</span> <span class="s2">&quot;equal&quot;</span> <span class="ow">and</span> <span class="n">_terminal_color_support</span><span class="p">():</span>
<span class="k">yield</span> <span class="n">red</span><span class="p">(</span><span class="n">prefix</span><span class="p">[</span><span class="n">tag</span><span class="p">]</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">line</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">prefix</span><span class="p">[</span><span class="n">tag</span><span class="p">]</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">line</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">PySparkErrorTestUtils</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> This util provide functions to accurate and consistent error testing</span>
<span class="sd"> based on PySpark error classes.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">check_error</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">exception</span><span class="p">:</span> <span class="n">PySparkException</span><span class="p">,</span>
<span class="n">error_class</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">query_context_type</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">QueryContextType</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">pyspark_fragment</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">):</span>
<span class="n">query_context</span> <span class="o">=</span> <span class="n">exception</span><span class="o">.</span><span class="n">getQueryContext</span><span class="p">()</span>
<span class="k">assert</span> <span class="nb">bool</span><span class="p">(</span><span class="n">query_context</span><span class="p">)</span> <span class="o">==</span> <span class="p">(</span><span class="n">query_context_type</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">),</span> <span class="p">(</span>
<span class="s2">&quot;`query_context_type` is required when QueryContext exists. &quot;</span>
<span class="sa">f</span><span class="s2">&quot;QueryContext: </span><span class="si">{</span><span class="n">query_context</span><span class="si">}</span><span class="s2">.&quot;</span>
<span class="p">)</span>
<span class="c1"># Test if given error is an instance of PySparkException.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertIsInstance</span><span class="p">(</span>
<span class="n">exception</span><span class="p">,</span>
<span class="n">PySparkException</span><span class="p">,</span>
<span class="sa">f</span><span class="s2">&quot;checkError requires &#39;PySparkException&#39;, got &#39;</span><span class="si">{</span><span class="n">exception</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="si">}</span><span class="s2">&#39;.&quot;</span><span class="p">,</span>
<span class="p">)</span>
<span class="c1"># Test error class</span>
<span class="n">expected</span> <span class="o">=</span> <span class="n">error_class</span>
<span class="n">actual</span> <span class="o">=</span> <span class="n">exception</span><span class="o">.</span><span class="n">getErrorClass</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertEqual</span><span class="p">(</span>
<span class="n">expected</span><span class="p">,</span> <span class="n">actual</span><span class="p">,</span> <span class="sa">f</span><span class="s2">&quot;Expected error class was &#39;</span><span class="si">{</span><span class="n">expected</span><span class="si">}</span><span class="s2">&#39;, got &#39;</span><span class="si">{</span><span class="n">actual</span><span class="si">}</span><span class="s2">&#39;.&quot;</span>
<span class="p">)</span>
<span class="c1"># Test message parameters</span>
<span class="n">expected</span> <span class="o">=</span> <span class="n">message_parameters</span>
<span class="n">actual</span> <span class="o">=</span> <span class="n">exception</span><span class="o">.</span><span class="n">getMessageParameters</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertEqual</span><span class="p">(</span>
<span class="n">expected</span><span class="p">,</span> <span class="n">actual</span><span class="p">,</span> <span class="sa">f</span><span class="s2">&quot;Expected message parameters was &#39;</span><span class="si">{</span><span class="n">expected</span><span class="si">}</span><span class="s2">&#39;, got &#39;</span><span class="si">{</span><span class="n">actual</span><span class="si">}</span><span class="s2">&#39;&quot;</span>
<span class="p">)</span>
<span class="c1"># Test query context</span>
<span class="k">if</span> <span class="n">query_context</span><span class="p">:</span>
<span class="n">expected</span> <span class="o">=</span> <span class="n">query_context_type</span>
<span class="n">actual_contexts</span> <span class="o">=</span> <span class="n">exception</span><span class="o">.</span><span class="n">getQueryContext</span><span class="p">()</span>
<span class="k">for</span> <span class="n">actual_context</span> <span class="ow">in</span> <span class="n">actual_contexts</span><span class="p">:</span>
<span class="n">actual</span> <span class="o">=</span> <span class="n">actual_context</span><span class="o">.</span><span class="n">contextType</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertEqual</span><span class="p">(</span>
<span class="n">expected</span><span class="p">,</span> <span class="n">actual</span><span class="p">,</span> <span class="sa">f</span><span class="s2">&quot;Expected QueryContext was &#39;</span><span class="si">{</span><span class="n">expected</span><span class="si">}</span><span class="s2">&#39;, got &#39;</span><span class="si">{</span><span class="n">actual</span><span class="si">}</span><span class="s2">&#39;&quot;</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">actual</span> <span class="o">==</span> <span class="n">QueryContextType</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">:</span>
<span class="k">assert</span> <span class="p">(</span>
<span class="n">pyspark_fragment</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="p">),</span> <span class="s2">&quot;`pyspark_fragment` is required when QueryContextType is DataFrame.&quot;</span>
<span class="n">expected</span> <span class="o">=</span> <span class="n">pyspark_fragment</span>
<span class="n">actual</span> <span class="o">=</span> <span class="n">actual_context</span><span class="o">.</span><span class="n">pysparkFragment</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertEqual</span><span class="p">(</span>
<span class="n">expected</span><span class="p">,</span>
<span class="n">actual</span><span class="p">,</span>
<span class="sa">f</span><span class="s2">&quot;Expected PySpark fragment was &#39;</span><span class="si">{</span><span class="n">expected</span><span class="si">}</span><span class="s2">&#39;, got &#39;</span><span class="si">{</span><span class="n">actual</span><span class="si">}</span><span class="s2">&#39;&quot;</span><span class="p">,</span>
<span class="p">)</span>
<div class="viewcode-block" id="assertSchemaEqual"><a class="viewcode-back" href="../../../reference/api/pyspark.testing.assertSchemaEqual.html#pyspark.testing.assertSchemaEqual">[docs]</a><span class="k">def</span> <span class="nf">assertSchemaEqual</span><span class="p">(</span>
<span class="n">actual</span><span class="p">:</span> <span class="n">StructType</span><span class="p">,</span>
<span class="n">expected</span><span class="p">:</span> <span class="n">StructType</span><span class="p">,</span>
<span class="n">ignoreNullable</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">True</span><span class="p">,</span>
<span class="n">ignoreColumnOrder</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="n">ignoreColumnName</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="p">):</span>
<span class="w"> </span><span class="sa">r</span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A util function to assert equality between DataFrame schemas `actual` and `expected`.</span>
<span class="sd"> .. versionadded:: 3.5.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> actual : StructType</span>
<span class="sd"> The DataFrame schema that is being compared or tested.</span>
<span class="sd"> expected : StructType</span>
<span class="sd"> The expected schema, for comparison with the actual schema.</span>
<span class="sd"> ignoreNullable : bool, default True</span>
<span class="sd"> Specifies whether a column’s nullable property is included when checking for</span>
<span class="sd"> schema equality.</span>
<span class="sd"> When set to `True` (default), the nullable property of the columns being compared</span>
<span class="sd"> is not taken into account and the columns will be considered equal even if they have</span>
<span class="sd"> different nullable settings.</span>
<span class="sd"> When set to `False`, columns are considered equal only if they have the same nullable</span>
<span class="sd"> setting.</span>
<span class="sd"> .. versionadded:: 4.0.0</span>
<span class="sd"> ignoreColumnOrder : bool, default False</span>
<span class="sd"> Specifies whether to compare columns in the order they appear in the DataFrame or by</span>
<span class="sd"> column name.</span>
<span class="sd"> If set to `False` (default), columns are compared in the order they appear in the</span>
<span class="sd"> DataFrames.</span>
<span class="sd"> When set to `True`, a column in the expected DataFrame is compared to the column with the</span>
<span class="sd"> same name in the actual DataFrame.</span>
<span class="sd"> .. versionadded:: 4.0.0</span>
<span class="sd"> ignoreColumnName : bool, default False</span>
<span class="sd"> Specifies whether to fail the initial schema equality check if the column names in the two</span>
<span class="sd"> DataFrames are different.</span>
<span class="sd"> When set to `False` (default), column names are checked and the function fails if they are</span>
<span class="sd"> different.</span>
<span class="sd"> When set to `True`, the function will succeed even if column names are different.</span>
<span class="sd"> Column data types are compared for columns in the order they appear in the DataFrames.</span>
<span class="sd"> .. versionadded:: 4.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> When assertSchemaEqual fails, the error message uses the Python `difflib` library to display</span>
<span class="sd"> a diff log of the `actual` and `expected` schemas.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType, DoubleType</span>
<span class="sd"> &gt;&gt;&gt; s1 = StructType([StructField(&quot;names&quot;, ArrayType(DoubleType(), True), True)])</span>
<span class="sd"> &gt;&gt;&gt; s2 = StructType([StructField(&quot;names&quot;, ArrayType(DoubleType(), True), True)])</span>
<span class="sd"> &gt;&gt;&gt; assertSchemaEqual(s1, s2) # pass, schemas are identical</span>
<span class="sd"> Different schemas with `ignoreNullable=False` would fail.</span>
<span class="sd"> &gt;&gt;&gt; s3 = StructType([StructField(&quot;names&quot;, ArrayType(DoubleType(), True), False)])</span>
<span class="sd"> &gt;&gt;&gt; assertSchemaEqual(s1, s3, ignoreNullable=False) # doctest: +IGNORE_EXCEPTION_DETAIL</span>
<span class="sd"> Traceback (most recent call last):</span>
<span class="sd"> ...</span>
<span class="sd"> PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.</span>
<span class="sd"> --- actual</span>
<span class="sd"> +++ expected</span>
<span class="sd"> - StructType([StructField(&#39;names&#39;, ArrayType(DoubleType(), True), True)])</span>
<span class="sd"> ? ^^^</span>
<span class="sd"> + StructType([StructField(&#39;names&#39;, ArrayType(DoubleType(), True), False)])</span>
<span class="sd"> ? ^^^^</span>
<span class="sd"> &gt;&gt;&gt; df1 = spark.createDataFrame(data=[(1, 1000), (2, 3000)], schema=[&quot;id&quot;, &quot;number&quot;])</span>
<span class="sd"> &gt;&gt;&gt; df2 = spark.createDataFrame(data=[(&quot;1&quot;, 1000), (&quot;2&quot;, 5000)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; assertSchemaEqual(df1.schema, df2.schema) # doctest: +IGNORE_EXCEPTION_DETAIL</span>
<span class="sd"> Traceback (most recent call last):</span>
<span class="sd"> ...</span>
<span class="sd"> PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.</span>
<span class="sd"> --- actual</span>
<span class="sd"> +++ expected</span>
<span class="sd"> - StructType([StructField(&#39;id&#39;, LongType(), True), StructField(&#39;number&#39;, LongType(), True)])</span>
<span class="sd"> ? ^^ ^^^^^</span>
<span class="sd"> + StructType([StructField(&#39;id&#39;, StringType(), True), StructField(&#39;amount&#39;, LongType(), True)])</span>
<span class="sd"> ? ^^^^ ++++ ^</span>
<span class="sd"> Compare two schemas ignoring the column order.</span>
<span class="sd"> &gt;&gt;&gt; s1 = StructType(</span>
<span class="sd"> ... [StructField(&quot;a&quot;, IntegerType(), True), StructField(&quot;b&quot;, DoubleType(), True)]</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; s2 = StructType(</span>
<span class="sd"> ... [StructField(&quot;b&quot;, DoubleType(), True), StructField(&quot;a&quot;, IntegerType(), True)]</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; assertSchemaEqual(s1, s2, ignoreColumnOrder=True)</span>
<span class="sd"> Compare two schemas ignoring the column names.</span>
<span class="sd"> &gt;&gt;&gt; s1 = StructType(</span>
<span class="sd"> ... [StructField(&quot;a&quot;, IntegerType(), True), StructField(&quot;c&quot;, DoubleType(), True)]</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; s2 = StructType(</span>
<span class="sd"> ... [StructField(&quot;b&quot;, IntegerType(), True), StructField(&quot;d&quot;, DoubleType(), True)]</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; assertSchemaEqual(s1, s2, ignoreColumnName=True)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="n">StructType</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;UNSUPPORTED_DATA_TYPE&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;data_type&quot;</span><span class="p">:</span> <span class="nb">type</span><span class="p">(</span><span class="n">actual</span><span class="p">)},</span>
<span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expected</span><span class="p">,</span> <span class="n">StructType</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;UNSUPPORTED_DATA_TYPE&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;data_type&quot;</span><span class="p">:</span> <span class="nb">type</span><span class="p">(</span><span class="n">expected</span><span class="p">)},</span>
<span class="p">)</span>
<span class="k">def</span> <span class="nf">compare_schemas_ignore_nullable</span><span class="p">(</span><span class="n">s1</span><span class="p">:</span> <span class="n">StructType</span><span class="p">,</span> <span class="n">s2</span><span class="p">:</span> <span class="n">StructType</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">s1</span><span class="p">)</span> <span class="o">!=</span> <span class="nb">len</span><span class="p">(</span><span class="n">s2</span><span class="p">):</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="n">zipped</span> <span class="o">=</span> <span class="n">zip_longest</span><span class="p">(</span><span class="n">s1</span><span class="p">,</span> <span class="n">s2</span><span class="p">)</span>
<span class="k">for</span> <span class="n">sf1</span><span class="p">,</span> <span class="n">sf2</span> <span class="ow">in</span> <span class="n">zipped</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">compare_structfields_ignore_nullable</span><span class="p">(</span><span class="n">sf1</span><span class="p">,</span> <span class="n">sf2</span><span class="p">):</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">def</span> <span class="nf">compare_structfields_ignore_nullable</span><span class="p">(</span><span class="n">actualSF</span><span class="p">:</span> <span class="n">StructField</span><span class="p">,</span> <span class="n">expectedSF</span><span class="p">:</span> <span class="n">StructField</span><span class="p">):</span>
<span class="k">if</span> <span class="n">actualSF</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">expectedSF</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">elif</span> <span class="n">actualSF</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">or</span> <span class="n">expectedSF</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">if</span> <span class="n">actualSF</span><span class="o">.</span><span class="n">name</span> <span class="o">!=</span> <span class="n">expectedSF</span><span class="o">.</span><span class="n">name</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">compare_datatypes_ignore_nullable</span><span class="p">(</span><span class="n">actualSF</span><span class="o">.</span><span class="n">dataType</span><span class="p">,</span> <span class="n">expectedSF</span><span class="o">.</span><span class="n">dataType</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">compare_datatypes_ignore_nullable</span><span class="p">(</span><span class="n">dt1</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="n">dt2</span><span class="p">:</span> <span class="n">Any</span><span class="p">):</span>
<span class="c1"># checks datatype equality, using recursion to ignore nullable</span>
<span class="k">if</span> <span class="n">dt1</span><span class="o">.</span><span class="n">typeName</span><span class="p">()</span> <span class="o">==</span> <span class="n">dt2</span><span class="o">.</span><span class="n">typeName</span><span class="p">():</span>
<span class="k">if</span> <span class="n">dt1</span><span class="o">.</span><span class="n">typeName</span><span class="p">()</span> <span class="o">==</span> <span class="s2">&quot;array&quot;</span><span class="p">:</span>
<span class="k">return</span> <span class="n">compare_datatypes_ignore_nullable</span><span class="p">(</span><span class="n">dt1</span><span class="o">.</span><span class="n">elementType</span><span class="p">,</span> <span class="n">dt2</span><span class="o">.</span><span class="n">elementType</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">dt1</span><span class="o">.</span><span class="n">typeName</span><span class="p">()</span> <span class="o">==</span> <span class="s2">&quot;struct&quot;</span><span class="p">:</span>
<span class="k">return</span> <span class="n">compare_schemas_ignore_nullable</span><span class="p">(</span><span class="n">dt1</span><span class="p">,</span> <span class="n">dt2</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">if</span> <span class="n">ignoreColumnOrder</span><span class="p">:</span>
<span class="n">actual</span> <span class="o">=</span> <span class="n">StructType</span><span class="p">(</span><span class="nb">sorted</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="n">key</span><span class="o">=</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="o">.</span><span class="n">name</span><span class="p">))</span>
<span class="n">expected</span> <span class="o">=</span> <span class="n">StructType</span><span class="p">(</span><span class="nb">sorted</span><span class="p">(</span><span class="n">expected</span><span class="p">,</span> <span class="n">key</span><span class="o">=</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="o">.</span><span class="n">name</span><span class="p">))</span>
<span class="k">if</span> <span class="n">ignoreColumnName</span><span class="p">:</span>
<span class="n">actual</span> <span class="o">=</span> <span class="n">StructType</span><span class="p">(</span>
<span class="p">[</span><span class="n">StructField</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">i</span><span class="p">),</span> <span class="n">field</span><span class="o">.</span><span class="n">dataType</span><span class="p">,</span> <span class="n">field</span><span class="o">.</span><span class="n">nullable</span><span class="p">)</span> <span class="k">for</span> <span class="n">i</span><span class="p">,</span> <span class="n">field</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="n">actual</span><span class="p">)]</span>
<span class="p">)</span>
<span class="n">expected</span> <span class="o">=</span> <span class="n">StructType</span><span class="p">(</span>
<span class="p">[</span>
<span class="n">StructField</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">i</span><span class="p">),</span> <span class="n">field</span><span class="o">.</span><span class="n">dataType</span><span class="p">,</span> <span class="n">field</span><span class="o">.</span><span class="n">nullable</span><span class="p">)</span>
<span class="k">for</span> <span class="n">i</span><span class="p">,</span> <span class="n">field</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="n">expected</span><span class="p">)</span>
<span class="p">]</span>
<span class="p">)</span>
<span class="k">if</span> <span class="p">(</span><span class="n">ignoreNullable</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">compare_schemas_ignore_nullable</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="n">expected</span><span class="p">))</span> <span class="ow">or</span> <span class="p">(</span>
<span class="ow">not</span> <span class="n">ignoreNullable</span> <span class="ow">and</span> <span class="n">actual</span> <span class="o">!=</span> <span class="n">expected</span>
<span class="p">):</span>
<span class="n">generated_diff</span> <span class="o">=</span> <span class="n">difflib</span><span class="o">.</span><span class="n">ndiff</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">actual</span><span class="p">)</span><span class="o">.</span><span class="n">splitlines</span><span class="p">(),</span> <span class="nb">str</span><span class="p">(</span><span class="n">expected</span><span class="p">)</span><span class="o">.</span><span class="n">splitlines</span><span class="p">())</span>
<span class="n">error_msg</span> <span class="o">=</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">generated_diff</span><span class="p">)</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;DIFFERENT_SCHEMA&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;error_msg&quot;</span><span class="p">:</span> <span class="n">error_msg</span><span class="p">},</span>
<span class="p">)</span></div>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">TYPE_CHECKING</span>
<span class="k">if</span> <span class="n">TYPE_CHECKING</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">pandas</span>
<span class="kn">import</span> <span class="nn">pyspark.pandas</span>
<div class="viewcode-block" id="assertDataFrameEqual"><a class="viewcode-back" href="../../../reference/api/pyspark.testing.assertDataFrameEqual.html#pyspark.testing.assertDataFrameEqual">[docs]</a><span class="k">def</span> <span class="nf">assertDataFrameEqual</span><span class="p">(</span>
<span class="n">actual</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">DataFrame</span><span class="p">,</span> <span class="s2">&quot;pandas.DataFrame&quot;</span><span class="p">,</span> <span class="s2">&quot;pyspark.pandas.DataFrame&quot;</span><span class="p">,</span> <span class="n">List</span><span class="p">[</span><span class="n">Row</span><span class="p">]],</span>
<span class="n">expected</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">DataFrame</span><span class="p">,</span> <span class="s2">&quot;pandas.DataFrame&quot;</span><span class="p">,</span> <span class="s2">&quot;pyspark.pandas.DataFrame&quot;</span><span class="p">,</span> <span class="n">List</span><span class="p">[</span><span class="n">Row</span><span class="p">]],</span>
<span class="n">checkRowOrder</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="n">rtol</span><span class="p">:</span> <span class="nb">float</span> <span class="o">=</span> <span class="mf">1e-5</span><span class="p">,</span>
<span class="n">atol</span><span class="p">:</span> <span class="nb">float</span> <span class="o">=</span> <span class="mf">1e-8</span><span class="p">,</span>
<span class="n">ignoreNullable</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">True</span><span class="p">,</span>
<span class="n">ignoreColumnOrder</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="n">ignoreColumnName</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="n">ignoreColumnType</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="n">maxErrors</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">showOnlyDiff</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="n">includeDiffRows</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="p">):</span>
<span class="w"> </span><span class="sa">r</span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A util function to assert equality between `actual` and `expected`</span>
<span class="sd"> (DataFrames or lists of Rows), with optional parameters `checkRowOrder`, `rtol`, and `atol`.</span>
<span class="sd"> Supports Spark, Spark Connect, pandas, and pandas-on-Spark DataFrames.</span>
<span class="sd"> For more information about pandas-on-Spark DataFrame equality, see the docs for</span>
<span class="sd"> `assertPandasOnSparkEqual`.</span>
<span class="sd"> .. versionadded:: 3.5.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> actual : DataFrame (Spark, Spark Connect, pandas, or pandas-on-Spark) or list of Rows</span>
<span class="sd"> The DataFrame that is being compared or tested.</span>
<span class="sd"> expected : DataFrame (Spark, Spark Connect, pandas, or pandas-on-Spark) or list of Rows</span>
<span class="sd"> The expected result of the operation, for comparison with the actual result.</span>
<span class="sd"> checkRowOrder : bool, optional</span>
<span class="sd"> A flag indicating whether the order of rows should be considered in the comparison.</span>
<span class="sd"> If set to `False` (default), the row order is not taken into account.</span>
<span class="sd"> If set to `True`, the order of rows is important and will be checked during comparison.</span>
<span class="sd"> (See Notes)</span>
<span class="sd"> rtol : float, optional</span>
<span class="sd"> The relative tolerance, used in asserting approximate equality for float values in actual</span>
<span class="sd"> and expected. Set to 1e-5 by default. (See Notes)</span>
<span class="sd"> atol : float, optional</span>
<span class="sd"> The absolute tolerance, used in asserting approximate equality for float values in actual</span>
<span class="sd"> and expected. Set to 1e-8 by default. (See Notes)</span>
<span class="sd"> ignoreNullable : bool, default True</span>
<span class="sd"> Specifies whether a column’s nullable property is included when checking for</span>
<span class="sd"> schema equality.</span>
<span class="sd"> When set to `True` (default), the nullable property of the columns being compared</span>
<span class="sd"> is not taken into account and the columns will be considered equal even if they have</span>
<span class="sd"> different nullable settings.</span>
<span class="sd"> When set to `False`, columns are considered equal only if they have the same nullable</span>
<span class="sd"> setting.</span>
<span class="sd"> .. versionadded:: 4.0.0</span>
<span class="sd"> ignoreColumnOrder : bool, default False</span>
<span class="sd"> Specifies whether to compare columns in the order they appear in the DataFrame or by</span>
<span class="sd"> column name.</span>
<span class="sd"> If set to `False` (default), columns are compared in the order they appear in the</span>
<span class="sd"> DataFrames.</span>
<span class="sd"> When set to `True`, a column in the expected DataFrame is compared to the column with the</span>
<span class="sd"> same name in the actual DataFrame.</span>
<span class="sd"> .. versionadded:: 4.0.0</span>
<span class="sd"> ignoreColumnName : bool, default False</span>
<span class="sd"> Specifies whether to fail the initial schema equality check if the column names in the two</span>
<span class="sd"> DataFrames are different.</span>
<span class="sd"> When set to `False` (default), column names are checked and the function fails if they are</span>
<span class="sd"> different.</span>
<span class="sd"> When set to `True`, the function will succeed even if column names are different.</span>
<span class="sd"> Column data types are compared for columns in the order they appear in the DataFrames.</span>
<span class="sd"> .. versionadded:: 4.0.0</span>
<span class="sd"> ignoreColumnType : bool, default False</span>
<span class="sd"> Specifies whether to ignore the data type of the columns when comparing.</span>
<span class="sd"> When set to `False` (default), column data types are checked and the function fails if they</span>
<span class="sd"> are different.</span>
<span class="sd"> When set to `True`, the schema equality check will succeed even if column data types are</span>
<span class="sd"> different and the function will attempt to compare rows.</span>
<span class="sd"> .. versionadded:: 4.0.0</span>
<span class="sd"> maxErrors : bool, optional</span>
<span class="sd"> The maximum number of row comparison failures to encounter before returning.</span>
<span class="sd"> When this number of row comparisons have failed, the function returns independent of how</span>
<span class="sd"> many rows have been compared.</span>
<span class="sd"> Set to None by default which means compare all rows independent of number of failures.</span>
<span class="sd"> .. versionadded:: 4.0.0</span>
<span class="sd"> showOnlyDiff : bool, default False</span>
<span class="sd"> If set to `True`, the error message will only include rows that are different.</span>
<span class="sd"> If set to `False` (default), the error message will include all rows</span>
<span class="sd"> (when there is at least one row that is different).</span>
<span class="sd"> .. versionadded:: 4.0.0</span>
<span class="sd"> includeDiffRows: bool, False</span>
<span class="sd"> If set to `True`, the unequal rows are included in PySparkAssertionError for further</span>
<span class="sd"> debugging. If set to `False` (default), the unequal rows are not returned as a data set.</span>
<span class="sd"> .. versionadded:: 4.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> When `assertDataFrameEqual` fails, the error message uses the Python `difflib` library to</span>
<span class="sd"> display a diff log of each row that differs in `actual` and `expected`.</span>
<span class="sd"> For `checkRowOrder`, note that PySpark DataFrame ordering is non-deterministic, unless</span>
<span class="sd"> explicitly sorted.</span>
<span class="sd"> Note that schema equality is checked only when `expected` is a DataFrame (not a list of Rows).</span>
<span class="sd"> For DataFrames with float/decimal values, assertDataFrame asserts approximate equality.</span>
<span class="sd"> Two float/decimal values a and b are approximately equal if the following equation is True:</span>
<span class="sd"> ``absolute(a - b) &lt;= (atol + rtol * absolute(b))``.</span>
<span class="sd"> `ignoreColumnOrder` cannot be set to `True` if `ignoreColumnNames` is also set to `True`.</span>
<span class="sd"> `ignoreColumnNames` cannot be set to `True` if `ignoreColumnOrder` is also set to `True`.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; df1 = spark.createDataFrame(data=[(&quot;1&quot;, 1000), (&quot;2&quot;, 3000)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; df2 = spark.createDataFrame(data=[(&quot;1&quot;, 1000), (&quot;2&quot;, 3000)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1, df2) # pass, DataFrames are identical</span>
<span class="sd"> &gt;&gt;&gt; df1 = spark.createDataFrame(data=[(&quot;1&quot;, 0.1), (&quot;2&quot;, 3.23)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; df2 = spark.createDataFrame(data=[(&quot;1&quot;, 0.109), (&quot;2&quot;, 3.23)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1, df2, rtol=1e-1) # pass, DataFrames are approx equal by rtol</span>
<span class="sd"> &gt;&gt;&gt; df1 = spark.createDataFrame(data=[(1, 1000), (2, 3000)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; list_of_rows = [Row(1, 1000), Row(2, 3000)]</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1, list_of_rows) # pass, actual and expected data are equal</span>
<span class="sd"> &gt;&gt;&gt; import pyspark.pandas as ps # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; df1 = ps.DataFrame({&#39;a&#39;: [1, 2, 3], &#39;b&#39;: [4, 5, 6], &#39;c&#39;: [7, 8, 9]}) # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; df2 = ps.DataFrame({&#39;a&#39;: [1, 2, 3], &#39;b&#39;: [4, 5, 6], &#39;c&#39;: [7, 8, 9]}) # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; # pass, pandas-on-Spark DataFrames are equal</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1, df2) # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; df1 = spark.createDataFrame(</span>
<span class="sd"> ... data=[(&quot;1&quot;, 1000.00), (&quot;2&quot;, 3000.00), (&quot;3&quot;, 2000.00)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; df2 = spark.createDataFrame(</span>
<span class="sd"> ... data=[(&quot;1&quot;, 1001.00), (&quot;2&quot;, 3000.00), (&quot;3&quot;, 2003.00)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1, df2) # doctest: +IGNORE_EXCEPTION_DETAIL</span>
<span class="sd"> Traceback (most recent call last):</span>
<span class="sd"> ...</span>
<span class="sd"> PySparkAssertionError: [DIFFERENT_ROWS] Results do not match: ( 66.66667 % )</span>
<span class="sd"> *** actual ***</span>
<span class="sd"> ! Row(id=&#39;1&#39;, amount=1000.0)</span>
<span class="sd"> Row(id=&#39;2&#39;, amount=3000.0)</span>
<span class="sd"> ! Row(id=&#39;3&#39;, amount=2000.0)</span>
<span class="sd"> *** expected ***</span>
<span class="sd"> ! Row(id=&#39;1&#39;, amount=1001.0)</span>
<span class="sd"> Row(id=&#39;2&#39;, amount=3000.0)</span>
<span class="sd"> ! Row(id=&#39;3&#39;, amount=2003.0)</span>
<span class="sd"> Example for ignoreNullable</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.types import StructType, StructField, StringType, LongType</span>
<span class="sd"> &gt;&gt;&gt; df1_nullable = spark.createDataFrame(</span>
<span class="sd"> ... data=[(1000, &quot;1&quot;), (5000, &quot;2&quot;)],</span>
<span class="sd"> ... schema=StructType(</span>
<span class="sd"> ... [StructField(&quot;amount&quot;, LongType(), True), StructField(&quot;id&quot;, StringType(), True)]</span>
<span class="sd"> ... )</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; df2_nullable = spark.createDataFrame(</span>
<span class="sd"> ... data=[(1000, &quot;1&quot;), (5000, &quot;2&quot;)],</span>
<span class="sd"> ... schema=StructType(</span>
<span class="sd"> ... [StructField(&quot;amount&quot;, LongType(), True), StructField(&quot;id&quot;, StringType(), False)]</span>
<span class="sd"> ... )</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1_nullable, df2_nullable, ignoreNullable=True) # pass</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(</span>
<span class="sd"> ... df1_nullable, df2_nullable, ignoreNullable=False</span>
<span class="sd"> ... ) # doctest: +IGNORE_EXCEPTION_DETAIL</span>
<span class="sd"> Traceback (most recent call last):</span>
<span class="sd"> ...</span>
<span class="sd"> PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.</span>
<span class="sd"> --- actual</span>
<span class="sd"> +++ expected</span>
<span class="sd"> - StructType([StructField(&#39;amount&#39;, LongType(), True), StructField(&#39;id&#39;, StringType(), True)])</span>
<span class="sd"> ? ^^^</span>
<span class="sd"> + StructType([StructField(&#39;amount&#39;, LongType(), True), StructField(&#39;id&#39;, StringType(), False)])</span>
<span class="sd"> ? ^^^^</span>
<span class="sd"> Example for ignoreColumnOrder</span>
<span class="sd"> &gt;&gt;&gt; df1_col_order = spark.createDataFrame(</span>
<span class="sd"> ... data=[(1000, &quot;1&quot;), (5000, &quot;2&quot;)], schema=[&quot;amount&quot;, &quot;id&quot;]</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; df2_col_order = spark.createDataFrame(</span>
<span class="sd"> ... data=[(&quot;1&quot;, 1000), (&quot;2&quot;, 5000)], schema=[&quot;id&quot;, &quot;amount&quot;]</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1_col_order, df2_col_order, ignoreColumnOrder=True)</span>
<span class="sd"> Example for ignoreColumnName</span>
<span class="sd"> &gt;&gt;&gt; df1_col_names = spark.createDataFrame(</span>
<span class="sd"> ... data=[(1000, &quot;1&quot;), (5000, &quot;2&quot;)], schema=[&quot;amount&quot;, &quot;identity&quot;]</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; df2_col_names = spark.createDataFrame(</span>
<span class="sd"> ... data=[(1000, &quot;1&quot;), (5000, &quot;2&quot;)], schema=[&quot;amount&quot;, &quot;id&quot;]</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1_col_names, df2_col_names, ignoreColumnName=True)</span>
<span class="sd"> Example for ignoreColumnType</span>
<span class="sd"> &gt;&gt;&gt; df1_col_types = spark.createDataFrame(</span>
<span class="sd"> ... data=[(1000, &quot;1&quot;), (5000, &quot;2&quot;)], schema=[&quot;amount&quot;, &quot;id&quot;]</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; df2_col_types = spark.createDataFrame(</span>
<span class="sd"> ... data=[(1000.0, &quot;1&quot;), (5000.0, &quot;2&quot;)], schema=[&quot;amount&quot;, &quot;id&quot;]</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1_col_types, df2_col_types, ignoreColumnType=True)</span>
<span class="sd"> Example for maxErrors (will only report the first mismatching row)</span>
<span class="sd"> &gt;&gt;&gt; df1 = spark.createDataFrame([(1, &quot;A&quot;), (2, &quot;B&quot;), (3, &quot;C&quot;)])</span>
<span class="sd"> &gt;&gt;&gt; df2 = spark.createDataFrame([(1, &quot;A&quot;), (2, &quot;X&quot;), (3, &quot;Y&quot;)])</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1, df2, maxErrors=1) # doctest: +IGNORE_EXCEPTION_DETAIL</span>
<span class="sd"> Traceback (most recent call last):</span>
<span class="sd"> ...</span>
<span class="sd"> PySparkAssertionError: [DIFFERENT_ROWS] Results do not match: ( 33.33333 % )</span>
<span class="sd"> *** actual ***</span>
<span class="sd"> Row(_1=1, _2=&#39;A&#39;)</span>
<span class="sd"> ! Row(_1=2, _2=&#39;B&#39;)</span>
<span class="sd"> *** expected ***</span>
<span class="sd"> Row(_1=1, _2=&#39;A&#39;)</span>
<span class="sd"> ! Row(_1=2, _2=&#39;X&#39;)</span>
<span class="sd"> Example for showOnlyDiff (will only report the mismatching rows)</span>
<span class="sd"> &gt;&gt;&gt; df1 = spark.createDataFrame([(1, &quot;A&quot;), (2, &quot;B&quot;), (3, &quot;C&quot;)])</span>
<span class="sd"> &gt;&gt;&gt; df2 = spark.createDataFrame([(1, &quot;A&quot;), (2, &quot;X&quot;), (3, &quot;Y&quot;)])</span>
<span class="sd"> &gt;&gt;&gt; assertDataFrameEqual(df1, df2, showOnlyDiff=True) # doctest: +IGNORE_EXCEPTION_DETAIL</span>
<span class="sd"> Traceback (most recent call last):</span>
<span class="sd"> ...</span>
<span class="sd"> PySparkAssertionError: [DIFFERENT_ROWS] Results do not match: ( 66.66667 % )</span>
<span class="sd"> *** actual ***</span>
<span class="sd"> ! Row(_1=2, _2=&#39;B&#39;)</span>
<span class="sd"> ! Row(_1=3, _2=&#39;C&#39;)</span>
<span class="sd"> *** expected ***</span>
<span class="sd"> ! Row(_1=2, _2=&#39;X&#39;)</span>
<span class="sd"> ! Row(_1=3, _2=&#39;Y&#39;)</span>
<span class="sd"> The `includeDiffRows` parameter can be used to include the rows that did not match</span>
<span class="sd"> in the PySparkAssertionError. This can be useful for debugging or further analysis.</span>
<span class="sd"> &gt;&gt;&gt; df1 = spark.createDataFrame(</span>
<span class="sd"> ... data=[(&quot;1&quot;, 1000.00), (&quot;2&quot;, 3000.00), (&quot;3&quot;, 2000.00)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; df2 = spark.createDataFrame(</span>
<span class="sd"> ... data=[(&quot;1&quot;, 1001.00), (&quot;2&quot;, 3000.00), (&quot;3&quot;, 2003.00)], schema=[&quot;id&quot;, &quot;amount&quot;])</span>
<span class="sd"> &gt;&gt;&gt; try:</span>
<span class="sd"> ... assertDataFrameEqual(df1, df2, includeDiffRows=True)</span>
<span class="sd"> ... except PySparkAssertionError as e:</span>
<span class="sd"> ... spark.createDataFrame(e.data).show() # doctest: +NORMALIZE_WHITESPACE</span>
<span class="sd"> +-----------+-----------+</span>
<span class="sd"> | _1| _2|</span>
<span class="sd"> +-----------+-----------+</span>
<span class="sd"> |{1, 1000.0}|{1, 1001.0}|</span>
<span class="sd"> |{3, 2000.0}|{3, 2003.0}|</span>
<span class="sd"> +-----------+-----------+</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">actual</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">expected</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">elif</span> <span class="n">actual</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_TYPE_DF_EQUALITY_ARG&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;expected_type&quot;</span><span class="p">:</span> <span class="s2">&quot;Union[DataFrame, ps.DataFrame, List[Row]]&quot;</span><span class="p">,</span>
<span class="s2">&quot;arg_name&quot;</span><span class="p">:</span> <span class="s2">&quot;actual&quot;</span><span class="p">,</span>
<span class="s2">&quot;actual_type&quot;</span><span class="p">:</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="n">expected</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_TYPE_DF_EQUALITY_ARG&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;expected_type&quot;</span><span class="p">:</span> <span class="s2">&quot;Union[DataFrame, ps.DataFrame, List[Row]]&quot;</span><span class="p">,</span>
<span class="s2">&quot;arg_name&quot;</span><span class="p">:</span> <span class="s2">&quot;expected&quot;</span><span class="p">,</span>
<span class="s2">&quot;actual_type&quot;</span><span class="p">:</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="n">has_pandas</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># If pandas dependencies are available, allow pandas or pandas-on-Spark DataFrame</span>
<span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span>
<span class="n">has_pandas</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="c1"># no pandas, so we won&#39;t call pandasutils functions</span>
<span class="k">pass</span>
<span class="n">has_arrow</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">pyarrow</span>
<span class="n">has_arrow</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="k">pass</span>
<span class="k">if</span> <span class="n">has_pandas</span> <span class="ow">and</span> <span class="n">has_arrow</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">pyspark.pandas</span> <span class="k">as</span> <span class="nn">ps</span>
<span class="kn">from</span> <span class="nn">pyspark.testing.pandasutils</span> <span class="kn">import</span> <span class="n">PandasOnSparkTestUtils</span>
<span class="k">if</span> <span class="p">(</span>
<span class="nb">isinstance</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span>
<span class="ow">or</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expected</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span>
<span class="ow">or</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="n">ps</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span>
<span class="ow">or</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expected</span><span class="p">,</span> <span class="n">ps</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span>
<span class="p">):</span>
<span class="c1"># handle pandas DataFrames</span>
<span class="c1"># assert approximate equality for float data</span>
<span class="k">return</span> <span class="n">PandasOnSparkTestUtils</span><span class="p">()</span><span class="o">.</span><span class="n">assert_eq</span><span class="p">(</span>
<span class="n">actual</span><span class="p">,</span> <span class="n">expected</span><span class="p">,</span> <span class="n">almost</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">rtol</span><span class="o">=</span><span class="n">rtol</span><span class="p">,</span> <span class="n">atol</span><span class="o">=</span><span class="n">atol</span><span class="p">,</span> <span class="n">check_row_order</span><span class="o">=</span><span class="n">checkRowOrder</span>
<span class="p">)</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.utils</span> <span class="kn">import</span> <span class="n">get_dataframe_class</span>
<span class="c1"># if is_remote(), allow Connect DataFrame</span>
<span class="n">SparkDataFrame</span> <span class="o">=</span> <span class="n">get_dataframe_class</span><span class="p">()</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="p">(</span><span class="n">DataFrame</span><span class="p">,</span> <span class="n">SparkDataFrame</span><span class="p">,</span> <span class="nb">list</span><span class="p">)):</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_TYPE_DF_EQUALITY_ARG&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;expected_type&quot;</span><span class="p">:</span> <span class="s2">&quot;Union[DataFrame, ps.DataFrame, List[Row]]&quot;</span><span class="p">,</span>
<span class="s2">&quot;arg_name&quot;</span><span class="p">:</span> <span class="s2">&quot;actual&quot;</span><span class="p">,</span>
<span class="s2">&quot;actual_type&quot;</span><span class="p">:</span> <span class="nb">type</span><span class="p">(</span><span class="n">actual</span><span class="p">),</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expected</span><span class="p">,</span> <span class="p">(</span><span class="n">DataFrame</span><span class="p">,</span> <span class="n">SparkDataFrame</span><span class="p">,</span> <span class="nb">list</span><span class="p">)):</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_TYPE_DF_EQUALITY_ARG&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;expected_type&quot;</span><span class="p">:</span> <span class="s2">&quot;Union[DataFrame, ps.DataFrame, List[Row]]&quot;</span><span class="p">,</span>
<span class="s2">&quot;arg_name&quot;</span><span class="p">:</span> <span class="s2">&quot;expected&quot;</span><span class="p">,</span>
<span class="s2">&quot;actual_type&quot;</span><span class="p">:</span> <span class="nb">type</span><span class="p">(</span><span class="n">expected</span><span class="p">),</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">ignoreColumnOrder</span><span class="p">:</span>
<span class="n">actual</span> <span class="o">=</span> <span class="n">actual</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="o">*</span><span class="nb">sorted</span><span class="p">(</span><span class="n">actual</span><span class="o">.</span><span class="n">columns</span><span class="p">))</span>
<span class="n">expected</span> <span class="o">=</span> <span class="n">expected</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="o">*</span><span class="nb">sorted</span><span class="p">(</span><span class="n">expected</span><span class="o">.</span><span class="n">columns</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">rename_dataframe_columns</span><span class="p">(</span><span class="n">df</span><span class="p">:</span> <span class="n">DataFrame</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">DataFrame</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Rename DataFrame columns to sequential numbers for comparison&quot;&quot;&quot;</span>
<span class="n">renamed_columns</span> <span class="o">=</span> <span class="p">[</span><span class="nb">str</span><span class="p">(</span><span class="n">i</span><span class="p">)</span> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="nb">len</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">columns</span><span class="p">))]</span>
<span class="k">return</span> <span class="n">df</span><span class="o">.</span><span class="n">toDF</span><span class="p">(</span><span class="o">*</span><span class="n">renamed_columns</span><span class="p">)</span>
<span class="k">if</span> <span class="n">ignoreColumnName</span><span class="p">:</span>
<span class="n">actual</span> <span class="o">=</span> <span class="n">rename_dataframe_columns</span><span class="p">(</span><span class="n">actual</span><span class="p">)</span>
<span class="n">expected</span> <span class="o">=</span> <span class="n">rename_dataframe_columns</span><span class="p">(</span><span class="n">expected</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">cast_columns_to_string</span><span class="p">(</span><span class="n">df</span><span class="p">:</span> <span class="n">DataFrame</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">DataFrame</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Cast all DataFrame columns to string for comparison&quot;&quot;&quot;</span>
<span class="k">for</span> <span class="n">col_name</span> <span class="ow">in</span> <span class="n">df</span><span class="o">.</span><span class="n">columns</span><span class="p">:</span>
<span class="c1"># Add logic to remove trailing .0 for float columns that are whole numbers</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">withColumn</span><span class="p">(</span>
<span class="n">col_name</span><span class="p">,</span>
<span class="n">when</span><span class="p">(</span>
<span class="p">(</span><span class="n">col</span><span class="p">(</span><span class="n">col_name</span><span class="p">)</span><span class="o">.</span><span class="n">cast</span><span class="p">(</span><span class="s2">&quot;float&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">isNotNull</span><span class="p">())</span>
<span class="o">&amp;</span> <span class="p">(</span><span class="n">col</span><span class="p">(</span><span class="n">col_name</span><span class="p">)</span><span class="o">.</span><span class="n">cast</span><span class="p">(</span><span class="s2">&quot;float&quot;</span><span class="p">)</span> <span class="o">==</span> <span class="n">col</span><span class="p">(</span><span class="n">col_name</span><span class="p">)</span><span class="o">.</span><span class="n">cast</span><span class="p">(</span><span class="s2">&quot;int&quot;</span><span class="p">)),</span>
<span class="n">col</span><span class="p">(</span><span class="n">col_name</span><span class="p">)</span><span class="o">.</span><span class="n">cast</span><span class="p">(</span><span class="s2">&quot;int&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">cast</span><span class="p">(</span><span class="s2">&quot;string&quot;</span><span class="p">),</span>
<span class="p">)</span><span class="o">.</span><span class="n">otherwise</span><span class="p">(</span><span class="n">col</span><span class="p">(</span><span class="n">col_name</span><span class="p">)</span><span class="o">.</span><span class="n">cast</span><span class="p">(</span><span class="s2">&quot;string&quot;</span><span class="p">)),</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">df</span>
<span class="k">if</span> <span class="n">ignoreColumnType</span><span class="p">:</span>
<span class="n">actual</span> <span class="o">=</span> <span class="n">cast_columns_to_string</span><span class="p">(</span><span class="n">actual</span><span class="p">)</span>
<span class="n">expected</span> <span class="o">=</span> <span class="n">cast_columns_to_string</span><span class="p">(</span><span class="n">expected</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">compare_rows</span><span class="p">(</span><span class="n">r1</span><span class="p">:</span> <span class="n">Row</span><span class="p">,</span> <span class="n">r2</span><span class="p">:</span> <span class="n">Row</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">compare_vals</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="n">val2</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="nb">list</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val2</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="n">val1</span><span class="p">)</span> <span class="o">==</span> <span class="nb">len</span><span class="p">(</span><span class="n">val2</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">all</span><span class="p">(</span>
<span class="n">compare_vals</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">)</span> <span class="k">for</span> <span class="n">x</span><span class="p">,</span> <span class="n">y</span> <span class="ow">in</span> <span class="nb">zip</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="n">val2</span><span class="p">)</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="n">Row</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val2</span><span class="p">,</span> <span class="n">Row</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">all</span><span class="p">(</span><span class="n">compare_vals</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">)</span> <span class="k">for</span> <span class="n">x</span><span class="p">,</span> <span class="n">y</span> <span class="ow">in</span> <span class="nb">zip</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="n">val2</span><span class="p">))</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="nb">dict</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val2</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span>
<span class="nb">len</span><span class="p">(</span><span class="n">val1</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span> <span class="o">==</span> <span class="nb">len</span><span class="p">(</span><span class="n">val2</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span>
<span class="ow">and</span> <span class="n">val1</span><span class="o">.</span><span class="n">keys</span><span class="p">()</span> <span class="o">==</span> <span class="n">val2</span><span class="o">.</span><span class="n">keys</span><span class="p">()</span>
<span class="ow">and</span> <span class="nb">all</span><span class="p">(</span><span class="n">compare_vals</span><span class="p">(</span><span class="n">val1</span><span class="p">[</span><span class="n">k</span><span class="p">],</span> <span class="n">val2</span><span class="p">[</span><span class="n">k</span><span class="p">])</span> <span class="k">for</span> <span class="n">k</span> <span class="ow">in</span> <span class="n">val1</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="nb">float</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val2</span><span class="p">,</span> <span class="nb">float</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">abs</span><span class="p">(</span><span class="n">val1</span> <span class="o">-</span> <span class="n">val2</span><span class="p">)</span> <span class="o">&gt;</span> <span class="p">(</span><span class="n">atol</span> <span class="o">+</span> <span class="n">rtol</span> <span class="o">*</span> <span class="nb">abs</span><span class="p">(</span><span class="n">val2</span><span class="p">)):</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val1</span><span class="p">,</span> <span class="n">Decimal</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">val2</span><span class="p">,</span> <span class="n">Decimal</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">abs</span><span class="p">(</span><span class="n">val1</span> <span class="o">-</span> <span class="n">val2</span><span class="p">)</span> <span class="o">&gt;</span> <span class="p">(</span><span class="n">Decimal</span><span class="p">(</span><span class="n">atol</span><span class="p">)</span> <span class="o">+</span> <span class="n">Decimal</span><span class="p">(</span><span class="n">rtol</span><span class="p">)</span> <span class="o">*</span> <span class="nb">abs</span><span class="p">(</span><span class="n">val2</span><span class="p">)):</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">if</span> <span class="n">val1</span> <span class="o">!=</span> <span class="n">val2</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">if</span> <span class="n">r1</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">r2</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">elif</span> <span class="n">r1</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">or</span> <span class="n">r2</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">return</span> <span class="n">compare_vals</span><span class="p">(</span><span class="n">r1</span><span class="p">,</span> <span class="n">r2</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">assert_rows_equal</span><span class="p">(</span>
<span class="n">rows1</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="n">Row</span><span class="p">],</span> <span class="n">rows2</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="n">Row</span><span class="p">],</span> <span class="n">maxErrors</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> <span class="n">showOnlyDiff</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span>
<span class="p">):</span>
<span class="n">zipped</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">zip_longest</span><span class="p">(</span><span class="n">rows1</span><span class="p">,</span> <span class="n">rows2</span><span class="p">))</span>
<span class="n">diff_rows_cnt</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">diff_rows</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">has_diff_rows</span> <span class="o">=</span> <span class="kc">False</span>
<span class="n">rows_str1</span> <span class="o">=</span> <span class="s2">&quot;&quot;</span>
<span class="n">rows_str2</span> <span class="o">=</span> <span class="s2">&quot;&quot;</span>
<span class="c1"># count different rows</span>
<span class="k">for</span> <span class="n">r1</span><span class="p">,</span> <span class="n">r2</span> <span class="ow">in</span> <span class="n">zipped</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">compare_rows</span><span class="p">(</span><span class="n">r1</span><span class="p">,</span> <span class="n">r2</span><span class="p">):</span>
<span class="n">diff_rows_cnt</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="n">has_diff_rows</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">if</span> <span class="n">includeDiffRows</span><span class="p">:</span>
<span class="n">diff_rows</span><span class="o">.</span><span class="n">append</span><span class="p">((</span><span class="n">r1</span><span class="p">,</span> <span class="n">r2</span><span class="p">))</span>
<span class="n">rows_str1</span> <span class="o">+=</span> <span class="nb">str</span><span class="p">(</span><span class="n">r1</span><span class="p">)</span> <span class="o">+</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span>
<span class="n">rows_str2</span> <span class="o">+=</span> <span class="nb">str</span><span class="p">(</span><span class="n">r2</span><span class="p">)</span> <span class="o">+</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span>
<span class="k">if</span> <span class="n">maxErrors</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">diff_rows_cnt</span> <span class="o">&gt;=</span> <span class="n">maxErrors</span><span class="p">:</span>
<span class="k">break</span>
<span class="k">elif</span> <span class="ow">not</span> <span class="n">showOnlyDiff</span><span class="p">:</span>
<span class="n">rows_str1</span> <span class="o">+=</span> <span class="nb">str</span><span class="p">(</span><span class="n">r1</span><span class="p">)</span> <span class="o">+</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span>
<span class="n">rows_str2</span> <span class="o">+=</span> <span class="nb">str</span><span class="p">(</span><span class="n">r2</span><span class="p">)</span> <span class="o">+</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span>
<span class="n">generated_diff</span> <span class="o">=</span> <span class="n">_context_diff</span><span class="p">(</span>
<span class="n">actual</span><span class="o">=</span><span class="n">rows_str1</span><span class="o">.</span><span class="n">splitlines</span><span class="p">(),</span> <span class="n">expected</span><span class="o">=</span><span class="n">rows_str2</span><span class="o">.</span><span class="n">splitlines</span><span class="p">(),</span> <span class="n">n</span><span class="o">=</span><span class="nb">len</span><span class="p">(</span><span class="n">zipped</span><span class="p">)</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">has_diff_rows</span><span class="p">:</span>
<span class="n">error_msg</span> <span class="o">=</span> <span class="s2">&quot;Results do not match: &quot;</span>
<span class="n">percent_diff</span> <span class="o">=</span> <span class="p">(</span><span class="n">diff_rows_cnt</span> <span class="o">/</span> <span class="nb">len</span><span class="p">(</span><span class="n">zipped</span><span class="p">))</span> <span class="o">*</span> <span class="mi">100</span>
<span class="n">error_msg</span> <span class="o">+=</span> <span class="s2">&quot;( </span><span class="si">%.5f</span><span class="s2"> </span><span class="si">%%</span><span class="s2"> )&quot;</span> <span class="o">%</span> <span class="n">percent_diff</span>
<span class="n">error_msg</span> <span class="o">+=</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span> <span class="o">+</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">generated_diff</span><span class="p">)</span>
<span class="n">data</span> <span class="o">=</span> <span class="n">diff_rows</span> <span class="k">if</span> <span class="n">includeDiffRows</span> <span class="k">else</span> <span class="kc">None</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;DIFFERENT_ROWS&quot;</span><span class="p">,</span> <span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;error_msg&quot;</span><span class="p">:</span> <span class="n">error_msg</span><span class="p">},</span> <span class="n">data</span><span class="o">=</span><span class="n">data</span>
<span class="p">)</span>
<span class="c1"># only compare schema if expected is not a List</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="nb">list</span><span class="p">)</span> <span class="ow">and</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expected</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="k">if</span> <span class="n">ignoreNullable</span><span class="p">:</span>
<span class="n">assertSchemaEqual</span><span class="p">(</span><span class="n">actual</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="n">expected</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">actual</span><span class="o">.</span><span class="n">schema</span> <span class="o">!=</span> <span class="n">expected</span><span class="o">.</span><span class="n">schema</span><span class="p">:</span>
<span class="n">generated_diff</span> <span class="o">=</span> <span class="n">difflib</span><span class="o">.</span><span class="n">ndiff</span><span class="p">(</span>
<span class="nb">str</span><span class="p">(</span><span class="n">actual</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span><span class="o">.</span><span class="n">splitlines</span><span class="p">(),</span> <span class="nb">str</span><span class="p">(</span><span class="n">expected</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span><span class="o">.</span><span class="n">splitlines</span><span class="p">()</span>
<span class="p">)</span>
<span class="n">error_msg</span> <span class="o">=</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">generated_diff</span><span class="p">)</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;DIFFERENT_SCHEMA&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;error_msg&quot;</span><span class="p">:</span> <span class="n">error_msg</span><span class="p">},</span>
<span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">actual</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="k">if</span> <span class="n">actual</span><span class="o">.</span><span class="n">isStreaming</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;UNSUPPORTED_OPERATION&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;operation&quot;</span><span class="p">:</span> <span class="s2">&quot;assertDataFrameEqual on streaming DataFrame&quot;</span><span class="p">},</span>
<span class="p">)</span>
<span class="n">actual_list</span> <span class="o">=</span> <span class="n">actual</span><span class="o">.</span><span class="n">collect</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">actual_list</span> <span class="o">=</span> <span class="n">actual</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">expected</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="k">if</span> <span class="n">expected</span><span class="o">.</span><span class="n">isStreaming</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkAssertionError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;UNSUPPORTED_OPERATION&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;operation&quot;</span><span class="p">:</span> <span class="s2">&quot;assertDataFrameEqual on streaming DataFrame&quot;</span><span class="p">},</span>
<span class="p">)</span>
<span class="n">expected_list</span> <span class="o">=</span> <span class="n">expected</span><span class="o">.</span><span class="n">collect</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">expected_list</span> <span class="o">=</span> <span class="n">expected</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">checkRowOrder</span><span class="p">:</span>
<span class="c1"># rename duplicate columns for sorting</span>
<span class="n">actual_list</span> <span class="o">=</span> <span class="nb">sorted</span><span class="p">(</span><span class="n">actual_list</span><span class="p">,</span> <span class="n">key</span><span class="o">=</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">x</span><span class="p">))</span>
<span class="n">expected_list</span> <span class="o">=</span> <span class="nb">sorted</span><span class="p">(</span><span class="n">expected_list</span><span class="p">,</span> <span class="n">key</span><span class="o">=</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">x</span><span class="p">))</span>
<span class="n">assert_rows_equal</span><span class="p">(</span><span class="n">actual_list</span><span class="p">,</span> <span class="n">expected_list</span><span class="p">,</span> <span class="n">maxErrors</span><span class="o">=</span><span class="n">maxErrors</span><span class="p">,</span> <span class="n">showOnlyDiff</span><span class="o">=</span><span class="n">showOnlyDiff</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">_test</span><span class="p">()</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">doctest</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span>
<span class="kn">import</span> <span class="nn">pyspark.testing.utils</span>
<span class="n">globs</span> <span class="o">=</span> <span class="n">pyspark</span><span class="o">.</span><span class="n">testing</span><span class="o">.</span><span class="n">utils</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span><span class="o">.</span><span class="n">builder</span><span class="o">.</span><span class="n">master</span><span class="p">(</span><span class="s2">&quot;local[4]&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">appName</span><span class="p">(</span><span class="s2">&quot;testing.utils tests&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;spark&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">spark</span>
<span class="p">(</span><span class="n">failure_count</span><span class="p">,</span> <span class="n">test_count</span><span class="p">)</span> <span class="o">=</span> <span class="n">doctest</span><span class="o">.</span><span class="n">testmod</span><span class="p">(</span>
<span class="n">pyspark</span><span class="o">.</span><span class="n">testing</span><span class="o">.</span><span class="n">utils</span><span class="p">,</span>
<span class="n">globs</span><span class="o">=</span><span class="n">globs</span><span class="p">,</span>
<span class="n">optionflags</span><span class="o">=</span><span class="n">doctest</span><span class="o">.</span><span class="n">ELLIPSIS</span> <span class="o">|</span> <span class="n">doctest</span><span class="o">.</span><span class="n">NORMALIZE_WHITESPACE</span><span class="p">,</span>
<span class="p">)</span>
<span class="n">spark</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">if</span> <span class="n">failure_count</span><span class="p">:</span>
<span class="n">sys</span><span class="o">.</span><span class="n">exit</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">)</span>
<span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">&quot;__main__&quot;</span><span class="p">:</span>
<span class="n">_test</span><span class="p">()</span>
</pre></div>
</article>
<footer class="bd-footer-article">
<div class="footer-article-items footer-article__inner">
<div class="footer-article-item"><!-- Previous / next buttons -->
<div class="prev-next-area">
</div></div>
</div>
</footer>
</div>
</div>
<footer class="bd-footer-content">
</footer>
</main>
</div>
</div>
<!-- Scripts loaded after <body> so the DOM is not blocked -->
<script src="../../../_static/scripts/bootstrap.js?digest=e353d410970836974a52"></script>
<script src="../../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52"></script>
<footer class="bd-footer">
<div class="bd-footer__inner bd-page-width">
<div class="footer-items__start">
<div class="footer-item"><p class="copyright">
Copyright @ 2024 The Apache Software Foundation, Licensed under the <a href="https://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
</p></div>
<div class="footer-item">
<p class="sphinx-version">
Created using <a href="https://www.sphinx-doc.org/">Sphinx</a> 4.5.0.
<br/>
</p>
</div>
</div>
<div class="footer-items__end">
<div class="footer-item"><p class="theme-version">
Built with the <a href="https://pydata-sphinx-theme.readthedocs.io/en/stable/index.html">PyData Sphinx Theme</a> 0.13.3.
</p></div>
</div>
</div>
</footer>
</body>
</html>