blob: 99b7e2023ba6039793321f6e6b3f206374113104 [file] [log] [blame]
<!DOCTYPE html>
<html >
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>pyspark.util &#8212; PySpark 4.0.0-preview2 documentation</title>
<script data-cfasync="false">
document.documentElement.dataset.mode = localStorage.getItem("mode") || "";
document.documentElement.dataset.theme = localStorage.getItem("theme") || "light";
</script>
<!-- Loaded before other Sphinx assets -->
<link href="../../_static/styles/theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../_static/styles/bootstrap.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../_static/styles/pydata-sphinx-theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../_static/vendor/fontawesome/6.1.2/css/all.min.css?digest=e353d410970836974a52" rel="stylesheet" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../_static/vendor/fontawesome/6.1.2/webfonts/fa-solid-900.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../_static/vendor/fontawesome/6.1.2/webfonts/fa-brands-400.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../_static/vendor/fontawesome/6.1.2/webfonts/fa-regular-400.woff2" />
<link rel="stylesheet" type="text/css" href="../../_static/pygments.css" />
<link rel="stylesheet" type="text/css" href="../../_static/copybutton.css" />
<link rel="stylesheet" type="text/css" href="../../_static/css/pyspark.css" />
<!-- Pre-loaded scripts that we'll load fully later -->
<link rel="preload" as="script" href="../../_static/scripts/bootstrap.js?digest=e353d410970836974a52" />
<link rel="preload" as="script" href="../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52" />
<script data-url_root="../../" id="documentation_options" src="../../_static/documentation_options.js"></script>
<script src="../../_static/jquery.js"></script>
<script src="../../_static/underscore.js"></script>
<script src="../../_static/doctools.js"></script>
<script src="../../_static/clipboard.min.js"></script>
<script src="../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script>DOCUMENTATION_OPTIONS.pagename = '_modules/pyspark/util';</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/_modules/pyspark/util.html" />
<link rel="search" title="Search" href="../../search.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="None">
<!-- Matomo -->
<script type="text/javascript">
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body data-bs-spy="scroll" data-bs-target=".bd-toc-nav" data-offset="180" data-bs-root-margin="0px 0px -60%" data-default-mode="">
<a class="skip-link" href="#main-content">Skip to main content</a>
<input type="checkbox"
class="sidebar-toggle"
name="__primary"
id="__primary"/>
<label class="overlay overlay-primary" for="__primary"></label>
<input type="checkbox"
class="sidebar-toggle"
name="__secondary"
id="__secondary"/>
<label class="overlay overlay-secondary" for="__secondary"></label>
<div class="search-button__wrapper">
<div class="search-button__overlay"></div>
<div class="search-button__search-container">
<form class="bd-search d-flex align-items-center"
action="../../search.html"
method="get">
<i class="fa-solid fa-magnifying-glass"></i>
<input type="search"
class="form-control"
name="q"
id="search-input"
placeholder="Search the docs ..."
aria-label="Search the docs ..."
autocomplete="off"
autocorrect="off"
autocapitalize="off"
spellcheck="false"/>
<span class="search-button__kbd-shortcut"><kbd class="kbd-shortcut__modifier">Ctrl</kbd>+<kbd>K</kbd></span>
</form></div>
</div>
<nav class="bd-header navbar navbar-expand-lg bd-navbar">
<div class="bd-header__inner bd-page-width">
<label class="sidebar-toggle primary-toggle" for="__primary">
<span class="fa-solid fa-bars"></span>
</label>
<div class="navbar-header-items__start">
<div class="navbar-item">
<a class="navbar-brand logo" href="../../index.html">
<img src="https://spark.apache.org/images/spark-logo.png" class="logo__image only-light" alt="Logo image"/>
<script>document.write(`<img src="https://spark.apache.org/images/spark-logo-rev.svg" class="logo__image only-dark" alt="Logo image"/>`);</script>
</a></div>
</div>
<div class="col-lg-9 navbar-header-items">
<div class="me-auto navbar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../user_guide/index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="navbar-header-items__end">
<div class="navbar-item navbar-persistent--container">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview2
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/util.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="navbar-persistent--mobile">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
</div>
</nav>
<div class="bd-container">
<div class="bd-container__inner bd-page-width">
<div class="bd-sidebar-primary bd-sidebar hide-on-wide">
<div class="sidebar-header-items sidebar-primary__section">
<div class="sidebar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../user_guide/index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="sidebar-header-items__end">
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview2
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/util.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="sidebar-primary-items__end sidebar-primary__section">
</div>
<div id="rtd-footer-container"></div>
</div>
<main id="main-content" class="bd-main">
<div class="bd-content">
<div class="bd-article-container">
<div class="bd-header-article">
<div class="header-article-items header-article__inner">
<div class="header-article-items__start">
<div class="header-article-item">
<nav aria-label="Breadcrumbs">
<ul class="bd-breadcrumbs" role="navigation" aria-label="Breadcrumb">
<li class="breadcrumb-item breadcrumb-home">
<a href="../../index.html" class="nav-link" aria-label="Home">
<i class="fa-solid fa-home"></i>
</a>
</li>
<li class="breadcrumb-item"><a href="../index.html" class="nav-link">Module code</a></li>
<li class="breadcrumb-item active" aria-current="page">pyspark.util</li>
</ul>
</nav>
</div>
</div>
</div>
</div>
<div id="searchbox"></div>
<article class="bd-article" role="main">
<h1>Source code for pyspark.util</h1><div class="highlight"><pre>
<span></span><span class="c1"># -*- coding: utf-8 -*-</span>
<span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="kn">import</span> <span class="nn">copy</span>
<span class="kn">import</span> <span class="nn">functools</span>
<span class="kn">import</span> <span class="nn">itertools</span>
<span class="kn">import</span> <span class="nn">os</span>
<span class="kn">import</span> <span class="nn">platform</span>
<span class="kn">import</span> <span class="nn">re</span>
<span class="kn">import</span> <span class="nn">sys</span>
<span class="kn">import</span> <span class="nn">threading</span>
<span class="kn">import</span> <span class="nn">traceback</span>
<span class="kn">import</span> <span class="nn">typing</span>
<span class="kn">import</span> <span class="nn">socket</span>
<span class="kn">from</span> <span class="nn">types</span> <span class="kn">import</span> <span class="n">TracebackType</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Any</span><span class="p">,</span> <span class="n">Callable</span><span class="p">,</span> <span class="n">IO</span><span class="p">,</span> <span class="n">Iterator</span><span class="p">,</span> <span class="n">List</span><span class="p">,</span> <span class="n">Optional</span><span class="p">,</span> <span class="n">TextIO</span><span class="p">,</span> <span class="n">Tuple</span><span class="p">,</span> <span class="n">Union</span>
<span class="kn">from</span> <span class="nn">pyspark.errors</span> <span class="kn">import</span> <span class="n">PySparkRuntimeError</span>
<span class="kn">from</span> <span class="nn">pyspark.serializers</span> <span class="kn">import</span> <span class="p">(</span>
<span class="n">write_int</span><span class="p">,</span>
<span class="n">read_int</span><span class="p">,</span>
<span class="n">write_with_length</span><span class="p">,</span>
<span class="n">SpecialLengths</span><span class="p">,</span>
<span class="n">UTF8Deserializer</span><span class="p">,</span>
<span class="p">)</span>
<span class="n">__all__</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="n">typing</span><span class="o">.</span><span class="n">TYPE_CHECKING</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">io</span>
<span class="kn">from</span> <span class="nn">py4j.java_collections</span> <span class="kn">import</span> <span class="n">JavaArray</span>
<span class="kn">from</span> <span class="nn">py4j.java_gateway</span> <span class="kn">import</span> <span class="n">JavaObject</span>
<span class="kn">from</span> <span class="nn">pyspark._typing</span> <span class="kn">import</span> <span class="n">NonUDFType</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.pandas._typing</span> <span class="kn">import</span> <span class="p">(</span>
<span class="n">PandasScalarUDFType</span><span class="p">,</span>
<span class="n">PandasGroupedMapUDFType</span><span class="p">,</span>
<span class="n">PandasGroupedAggUDFType</span><span class="p">,</span>
<span class="n">PandasWindowAggUDFType</span><span class="p">,</span>
<span class="n">PandasScalarIterUDFType</span><span class="p">,</span>
<span class="n">PandasMapIterUDFType</span><span class="p">,</span>
<span class="n">PandasCogroupedMapUDFType</span><span class="p">,</span>
<span class="n">ArrowMapIterUDFType</span><span class="p">,</span>
<span class="n">PandasGroupedMapUDFWithStateType</span><span class="p">,</span>
<span class="n">ArrowGroupedMapUDFType</span><span class="p">,</span>
<span class="n">ArrowCogroupedMapUDFType</span><span class="p">,</span>
<span class="n">PandasGroupedMapUDFTransformWithStateType</span><span class="p">,</span>
<span class="p">)</span>
<span class="kn">from</span> <span class="nn">pyspark.sql._typing</span> <span class="kn">import</span> <span class="p">(</span>
<span class="n">SQLArrowBatchedUDFType</span><span class="p">,</span>
<span class="n">SQLArrowTableUDFType</span><span class="p">,</span>
<span class="n">SQLBatchedUDFType</span><span class="p">,</span>
<span class="n">SQLTableUDFType</span><span class="p">,</span>
<span class="p">)</span>
<span class="kn">from</span> <span class="nn">pyspark.serializers</span> <span class="kn">import</span> <span class="n">Serializer</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span>
<span class="n">JVM_BYTE_MIN</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="o">-</span><span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">7</span><span class="p">)</span>
<span class="n">JVM_BYTE_MAX</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">7</span><span class="p">)</span> <span class="o">-</span> <span class="mi">1</span>
<span class="n">JVM_SHORT_MIN</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="o">-</span><span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">15</span><span class="p">)</span>
<span class="n">JVM_SHORT_MAX</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">15</span><span class="p">)</span> <span class="o">-</span> <span class="mi">1</span>
<span class="n">JVM_INT_MIN</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="o">-</span><span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">31</span><span class="p">)</span>
<span class="n">JVM_INT_MAX</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">31</span><span class="p">)</span> <span class="o">-</span> <span class="mi">1</span>
<span class="n">JVM_LONG_MIN</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="o">-</span><span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">63</span><span class="p">)</span>
<span class="n">JVM_LONG_MAX</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="p">(</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">63</span><span class="p">)</span> <span class="o">-</span> <span class="mi">1</span>
<span class="k">def</span> <span class="nf">print_exec</span><span class="p">(</span><span class="n">stream</span><span class="p">:</span> <span class="n">TextIO</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">ei</span> <span class="o">=</span> <span class="n">sys</span><span class="o">.</span><span class="n">exc_info</span><span class="p">()</span>
<span class="n">traceback</span><span class="o">.</span><span class="n">print_exception</span><span class="p">(</span><span class="n">ei</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">ei</span><span class="p">[</span><span class="mi">1</span><span class="p">],</span> <span class="n">ei</span><span class="p">[</span><span class="mi">2</span><span class="p">],</span> <span class="kc">None</span><span class="p">,</span> <span class="n">stream</span><span class="p">)</span>
<div class="viewcode-block" id="VersionUtils"><a class="viewcode-back" href="../../reference/api/pyspark.util.VersionUtils.html#pyspark.VersionUtils">[docs]</a><span class="k">class</span> <span class="nc">VersionUtils</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Provides utility method to determine Spark versions with given input string.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="VersionUtils.majorMinorVersion"><a class="viewcode-back" href="../../reference/api/pyspark.util.VersionUtils.majorMinorVersion.html#pyspark.VersionUtils.majorMinorVersion">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">majorMinorVersion</span><span class="p">(</span><span class="n">sparkVersion</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="nb">int</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Given a Spark version string, return the (major version number, minor version number).</span>
<span class="sd"> E.g., for 2.0.1-SNAPSHOT, return (2, 0).</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; sparkVersion = &quot;2.4.0&quot;</span>
<span class="sd"> &gt;&gt;&gt; VersionUtils.majorMinorVersion(sparkVersion)</span>
<span class="sd"> (2, 4)</span>
<span class="sd"> &gt;&gt;&gt; sparkVersion = &quot;2.3.0-SNAPSHOT&quot;</span>
<span class="sd"> &gt;&gt;&gt; VersionUtils.majorMinorVersion(sparkVersion)</span>
<span class="sd"> (2, 3)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">m</span> <span class="o">=</span> <span class="n">re</span><span class="o">.</span><span class="n">search</span><span class="p">(</span><span class="sa">r</span><span class="s2">&quot;^(\d+)\.(\d+)(\..*)?$&quot;</span><span class="p">,</span> <span class="n">sparkVersion</span><span class="p">)</span>
<span class="k">if</span> <span class="n">m</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">m</span><span class="o">.</span><span class="n">group</span><span class="p">(</span><span class="mi">1</span><span class="p">)),</span> <span class="nb">int</span><span class="p">(</span><span class="n">m</span><span class="o">.</span><span class="n">group</span><span class="p">(</span><span class="mi">2</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;Spark tried to parse &#39;</span><span class="si">%s</span><span class="s2">&#39; as a Spark&quot;</span> <span class="o">%</span> <span class="n">sparkVersion</span>
<span class="o">+</span> <span class="s2">&quot; version string, but it could not find the major and minor&quot;</span>
<span class="o">+</span> <span class="s2">&quot; version numbers.&quot;</span>
<span class="p">)</span></div></div>
<span class="k">class</span> <span class="nc">LogUtils</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Utils for querying structured Spark logs with Spark SQL.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">LOG_SCHEMA</span> <span class="o">=</span> <span class="p">(</span>
<span class="s2">&quot;ts TIMESTAMP, &quot;</span>
<span class="s2">&quot;level STRING, &quot;</span>
<span class="s2">&quot;msg STRING, &quot;</span>
<span class="s2">&quot;context map&lt;STRING, STRING&gt;, &quot;</span>
<span class="s2">&quot;exception STRUCT&lt;class STRING, msg STRING, &quot;</span>
<span class="s2">&quot;stacktrace ARRAY&lt;STRUCT&lt;class STRING, method STRING, file STRING,line STRING&gt;&gt;&gt;,&quot;</span>
<span class="s2">&quot;logger STRING&quot;</span>
<span class="p">)</span>
<span class="k">def</span> <span class="nf">fail_on_stopiteration</span><span class="p">(</span><span class="n">f</span><span class="p">:</span> <span class="n">Callable</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Callable</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Wraps the input function to fail on &#39;StopIteration&#39; by raising a &#39;RuntimeError&#39;</span>
<span class="sd"> prevents silent loss of data when &#39;f&#39; is used in a for loop in Spark code</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">wrapper</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="n">Any</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Any</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">return</span> <span class="n">f</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">StopIteration</span> <span class="k">as</span> <span class="n">exc</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">errorClass</span><span class="o">=</span><span class="s2">&quot;STOP_ITERATION_OCCURRED&quot;</span><span class="p">,</span>
<span class="n">messageParameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;exc&quot;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">exc</span><span class="p">),</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">wrapper</span>
<span class="k">def</span> <span class="nf">walk_tb</span><span class="p">(</span><span class="n">tb</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">TracebackType</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">TracebackType</span><span class="p">]:</span>
<span class="k">while</span> <span class="n">tb</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">tb</span>
<span class="n">tb</span> <span class="o">=</span> <span class="n">tb</span><span class="o">.</span><span class="n">tb_next</span>
<span class="k">def</span> <span class="nf">try_simplify_traceback</span><span class="p">(</span><span class="n">tb</span><span class="p">:</span> <span class="n">TracebackType</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="n">TracebackType</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Simplify the traceback. It removes the tracebacks in the current package, and only</span>
<span class="sd"> shows the traceback that is related to the thirdparty and user-specified codes.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> TracebackType or None</span>
<span class="sd"> Simplified traceback instance. It returns None if it fails to simplify.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This keeps the tracebacks once it sees they are from a different file even</span>
<span class="sd"> though the following tracebacks are from the current package.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; import importlib</span>
<span class="sd"> &gt;&gt;&gt; import sys</span>
<span class="sd"> &gt;&gt;&gt; import traceback</span>
<span class="sd"> &gt;&gt;&gt; import tempfile</span>
<span class="sd"> &gt;&gt;&gt; with tempfile.TemporaryDirectory(prefix=&quot;try_simplify_traceback&quot;) as tmp_dir:</span>
<span class="sd"> ... with open(&quot;%s/dummy_module.py&quot; % tmp_dir, &quot;w&quot;) as f:</span>
<span class="sd"> ... _ = f.write(</span>
<span class="sd"> ... &#39;def raise_stop_iteration():\\n&#39;</span>
<span class="sd"> ... &#39; raise StopIteration()\\n\\n&#39;</span>
<span class="sd"> ... &#39;def simple_wrapper(f):\\n&#39;</span>
<span class="sd"> ... &#39; def wrapper(*a, **k):\\n&#39;</span>
<span class="sd"> ... &#39; return f(*a, **k)\\n&#39;</span>
<span class="sd"> ... &#39; return wrapper\\n&#39;)</span>
<span class="sd"> ... f.flush()</span>
<span class="sd"> ... spec = importlib.util.spec_from_file_location(</span>
<span class="sd"> ... &quot;dummy_module&quot;, &quot;%s/dummy_module.py&quot; % tmp_dir)</span>
<span class="sd"> ... dummy_module = importlib.util.module_from_spec(spec)</span>
<span class="sd"> ... spec.loader.exec_module(dummy_module)</span>
<span class="sd"> &gt;&gt;&gt; def skip_doctest_traceback(tb):</span>
<span class="sd"> ... import pyspark</span>
<span class="sd"> ... root = os.path.dirname(pyspark.__file__)</span>
<span class="sd"> ... pairs = zip(walk_tb(tb), traceback.extract_tb(tb))</span>
<span class="sd"> ... for cur_tb, cur_frame in pairs:</span>
<span class="sd"> ... if cur_frame.filename.startswith(root):</span>
<span class="sd"> ... return cur_tb</span>
<span class="sd"> Regular exceptions should show the file name of the current package as below.</span>
<span class="sd"> &gt;&gt;&gt; exc_info = None</span>
<span class="sd"> &gt;&gt;&gt; try:</span>
<span class="sd"> ... fail_on_stopiteration(dummy_module.raise_stop_iteration)()</span>
<span class="sd"> ... except Exception as e:</span>
<span class="sd"> ... tb = sys.exc_info()[-1]</span>
<span class="sd"> ... e.__cause__ = None</span>
<span class="sd"> ... exc_info = &quot;&quot;.join(</span>
<span class="sd"> ... traceback.format_exception(type(e), e, tb))</span>
<span class="sd"> &gt;&gt;&gt; print(exc_info) # doctest: +NORMALIZE_WHITESPACE, +ELLIPSIS</span>
<span class="sd"> Traceback (most recent call last):</span>
<span class="sd"> File ...</span>
<span class="sd"> ...</span>
<span class="sd"> File &quot;/.../pyspark/util.py&quot;, line ...</span>
<span class="sd"> ...</span>
<span class="sd"> pyspark.errors.exceptions.base.PySparkRuntimeError: ...</span>
<span class="sd"> &gt;&gt;&gt; &quot;pyspark/util.py&quot; in exc_info</span>
<span class="sd"> True</span>
<span class="sd"> If the traceback is simplified with this method, it hides the current package file name:</span>
<span class="sd"> &gt;&gt;&gt; exc_info = None</span>
<span class="sd"> &gt;&gt;&gt; try:</span>
<span class="sd"> ... fail_on_stopiteration(dummy_module.raise_stop_iteration)()</span>
<span class="sd"> ... except Exception as e:</span>
<span class="sd"> ... tb = try_simplify_traceback(sys.exc_info()[-1])</span>
<span class="sd"> ... e.__cause__ = None</span>
<span class="sd"> ... exc_info = &quot;&quot;.join(</span>
<span class="sd"> ... traceback.format_exception(</span>
<span class="sd"> ... type(e), e, try_simplify_traceback(skip_doctest_traceback(tb))))</span>
<span class="sd"> &gt;&gt;&gt; print(exc_info) # doctest: +NORMALIZE_WHITESPACE, +ELLIPSIS</span>
<span class="sd"> pyspark.errors.exceptions.base.PySparkRuntimeError: ...</span>
<span class="sd"> &gt;&gt;&gt; &quot;pyspark/util.py&quot; in exc_info</span>
<span class="sd"> False</span>
<span class="sd"> In the case below, the traceback contains the current package in the middle.</span>
<span class="sd"> In this case, it just hides the top occurrence only.</span>
<span class="sd"> &gt;&gt;&gt; exc_info = None</span>
<span class="sd"> &gt;&gt;&gt; try:</span>
<span class="sd"> ... fail_on_stopiteration(dummy_module.simple_wrapper(</span>
<span class="sd"> ... fail_on_stopiteration(dummy_module.raise_stop_iteration)))()</span>
<span class="sd"> ... except Exception as e:</span>
<span class="sd"> ... tb = sys.exc_info()[-1]</span>
<span class="sd"> ... e.__cause__ = None</span>
<span class="sd"> ... exc_info_a = &quot;&quot;.join(</span>
<span class="sd"> ... traceback.format_exception(type(e), e, tb))</span>
<span class="sd"> ... exc_info_b = &quot;&quot;.join(</span>
<span class="sd"> ... traceback.format_exception(</span>
<span class="sd"> ... type(e), e, try_simplify_traceback(skip_doctest_traceback(tb))))</span>
<span class="sd"> &gt;&gt;&gt; exc_info_a.count(&quot;pyspark/util.py&quot;)</span>
<span class="sd"> 2</span>
<span class="sd"> &gt;&gt;&gt; exc_info_b.count(&quot;pyspark/util.py&quot;)</span>
<span class="sd"> 1</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="s2">&quot;pypy&quot;</span> <span class="ow">in</span> <span class="n">platform</span><span class="o">.</span><span class="n">python_implementation</span><span class="p">()</span><span class="o">.</span><span class="n">lower</span><span class="p">():</span>
<span class="c1"># Traceback modification is not supported with PyPy in PySpark.</span>
<span class="k">return</span> <span class="kc">None</span>
<span class="kn">import</span> <span class="nn">pyspark</span>
<span class="n">root</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">dirname</span><span class="p">(</span><span class="n">pyspark</span><span class="o">.</span><span class="vm">__file__</span><span class="p">)</span>
<span class="n">tb_next</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">new_tb</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">pairs</span> <span class="o">=</span> <span class="nb">zip</span><span class="p">(</span><span class="n">walk_tb</span><span class="p">(</span><span class="n">tb</span><span class="p">),</span> <span class="n">traceback</span><span class="o">.</span><span class="n">extract_tb</span><span class="p">(</span><span class="n">tb</span><span class="p">))</span>
<span class="n">last_seen</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">cur_tb</span><span class="p">,</span> <span class="n">cur_frame</span> <span class="ow">in</span> <span class="n">pairs</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">cur_frame</span><span class="o">.</span><span class="n">filename</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="n">root</span><span class="p">):</span>
<span class="c1"># Filter the stacktrace from the PySpark source itself.</span>
<span class="n">last_seen</span> <span class="o">=</span> <span class="p">[(</span><span class="n">cur_tb</span><span class="p">,</span> <span class="n">cur_frame</span><span class="p">)]</span>
<span class="k">break</span>
<span class="k">for</span> <span class="n">cur_tb</span><span class="p">,</span> <span class="n">cur_frame</span> <span class="ow">in</span> <span class="nb">reversed</span><span class="p">(</span><span class="nb">list</span><span class="p">(</span><span class="n">itertools</span><span class="o">.</span><span class="n">chain</span><span class="p">(</span><span class="n">last_seen</span><span class="p">,</span> <span class="n">pairs</span><span class="p">))):</span>
<span class="c1"># Once we have seen the file names outside, don&#39;t skip.</span>
<span class="n">new_tb</span> <span class="o">=</span> <span class="n">TracebackType</span><span class="p">(</span>
<span class="n">tb_next</span><span class="o">=</span><span class="n">tb_next</span><span class="p">,</span>
<span class="n">tb_frame</span><span class="o">=</span><span class="n">cur_tb</span><span class="o">.</span><span class="n">tb_frame</span><span class="p">,</span>
<span class="n">tb_lasti</span><span class="o">=</span><span class="n">cur_tb</span><span class="o">.</span><span class="n">tb_frame</span><span class="o">.</span><span class="n">f_lasti</span><span class="p">,</span>
<span class="n">tb_lineno</span><span class="o">=</span><span class="n">cur_tb</span><span class="o">.</span><span class="n">tb_frame</span><span class="o">.</span><span class="n">f_lineno</span> <span class="k">if</span> <span class="n">cur_tb</span><span class="o">.</span><span class="n">tb_frame</span><span class="o">.</span><span class="n">f_lineno</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="k">else</span> <span class="o">-</span><span class="mi">1</span><span class="p">,</span>
<span class="p">)</span>
<span class="n">tb_next</span> <span class="o">=</span> <span class="n">new_tb</span>
<span class="k">return</span> <span class="n">new_tb</span>
<span class="k">def</span> <span class="nf">_print_missing_jar</span><span class="p">(</span><span class="n">lib_name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">pkg_name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">jar_name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">spark_version</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd">________________________________________________________________________________________________</span>
<span class="sd"> Spark %(lib_name)s libraries not found in class path. Try one of the following.</span>
<span class="sd"> 1. Include the %(lib_name)s library and its dependencies with in the</span>
<span class="sd"> spark-submit command as</span>
<span class="sd"> $ bin/spark-submit --packages org.apache.spark:spark-%(pkg_name)s:%(spark_version)s ...</span>
<span class="sd"> 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,</span>
<span class="sd"> Group Id = org.apache.spark, Artifact Id = spark-%(jar_name)s, Version = %(spark_version)s.</span>
<span class="sd"> Then, include the jar in the spark-submit command as</span>
<span class="sd"> $ bin/spark-submit --jars &lt;spark-%(jar_name)s.jar&gt; ...</span>
<span class="sd">________________________________________________________________________________________________</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="o">%</span> <span class="p">{</span>
<span class="s2">&quot;lib_name&quot;</span><span class="p">:</span> <span class="n">lib_name</span><span class="p">,</span>
<span class="s2">&quot;pkg_name&quot;</span><span class="p">:</span> <span class="n">pkg_name</span><span class="p">,</span>
<span class="s2">&quot;jar_name&quot;</span><span class="p">:</span> <span class="n">jar_name</span><span class="p">,</span>
<span class="s2">&quot;spark_version&quot;</span><span class="p">:</span> <span class="n">spark_version</span><span class="p">,</span>
<span class="p">}</span>
<span class="p">)</span>
<span class="k">def</span> <span class="nf">_parse_memory</span><span class="p">(</span><span class="n">s</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">int</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Parse a memory string in the format supported by Java (e.g. 1g, 200m) and</span>
<span class="sd"> return the value in MiB</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; _parse_memory(&quot;256m&quot;)</span>
<span class="sd"> 256</span>
<span class="sd"> &gt;&gt;&gt; _parse_memory(&quot;2g&quot;)</span>
<span class="sd"> 2048</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">units</span> <span class="o">=</span> <span class="p">{</span><span class="s2">&quot;g&quot;</span><span class="p">:</span> <span class="mi">1024</span><span class="p">,</span> <span class="s2">&quot;m&quot;</span><span class="p">:</span> <span class="mi">1</span><span class="p">,</span> <span class="s2">&quot;t&quot;</span><span class="p">:</span> <span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">20</span><span class="p">,</span> <span class="s2">&quot;k&quot;</span><span class="p">:</span> <span class="mf">1.0</span> <span class="o">/</span> <span class="mi">1024</span><span class="p">}</span>
<span class="k">if</span> <span class="n">s</span><span class="p">[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">units</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;invalid format: &quot;</span> <span class="o">+</span> <span class="n">s</span><span class="p">)</span>
<span class="k">return</span> <span class="nb">int</span><span class="p">(</span><span class="nb">float</span><span class="p">(</span><span class="n">s</span><span class="p">[:</span><span class="o">-</span><span class="mi">1</span><span class="p">])</span> <span class="o">*</span> <span class="n">units</span><span class="p">[</span><span class="n">s</span><span class="p">[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">lower</span><span class="p">()])</span>
<div class="viewcode-block" id="inheritable_thread_target"><a class="viewcode-back" href="../../reference/api/pyspark.inheritable_thread_target.html#pyspark.inheritable_thread_target">[docs]</a><span class="k">def</span> <span class="nf">inheritable_thread_target</span><span class="p">(</span><span class="n">f</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="n">Callable</span><span class="p">,</span> <span class="s2">&quot;SparkSession&quot;</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Callable</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return thread target wrapper which is recommended to be used in PySpark when the</span>
<span class="sd"> pinned thread mode is enabled. The wrapper function, before calling original</span>
<span class="sd"> thread target, it inherits the inheritable properties specific</span>
<span class="sd"> to JVM thread such as ``InheritableThreadLocal``, or thread local such as tags</span>
<span class="sd"> with Spark Connect.</span>
<span class="sd"> When the pinned thread mode is off, it return the original ``f``.</span>
<span class="sd"> .. versionadded:: 3.2.0</span>
<span class="sd"> .. versionchanged:: 3.5.0</span>
<span class="sd"> Supports Spark Connect.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> f : function, or :class:`SparkSession`</span>
<span class="sd"> the original thread target, or :class:`SparkSession` if Spark Connect is being used.</span>
<span class="sd"> See the examples below.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is experimental.</span>
<span class="sd"> It is important to know that it captures the local properties or tags when you</span>
<span class="sd"> decorate it whereas :class:`InheritableThread` captures when the thread is started.</span>
<span class="sd"> Therefore, it is encouraged to decorate it when you want to capture the local</span>
<span class="sd"> properties.</span>
<span class="sd"> For example, the local properties or tags from the current Spark context or Spark</span>
<span class="sd"> session is captured when you define a function here instead of the invocation:</span>
<span class="sd"> &gt;&gt;&gt; @inheritable_thread_target</span>
<span class="sd"> ... def target_func():</span>
<span class="sd"> ... pass # your codes.</span>
<span class="sd"> If you have any updates on local properties or tags afterwards, it would not be</span>
<span class="sd"> reflected to the Spark context in ``target_func()``.</span>
<span class="sd"> The example below mimics the behavior of JVM threads as close as possible:</span>
<span class="sd"> &gt;&gt;&gt; Thread(target=inheritable_thread_target(target_func)).start() # doctest: +SKIP</span>
<span class="sd"> If you&#39;re using Spark Connect, you should explicitly provide Spark session as follows:</span>
<span class="sd"> &gt;&gt;&gt; @inheritable_thread_target(session) # doctest: +SKIP</span>
<span class="sd"> ... def target_func():</span>
<span class="sd"> ... pass # your codes.</span>
<span class="sd"> &gt;&gt;&gt; Thread(target=inheritable_thread_target(session)(target_func)).start() # doctest: +SKIP</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">is_remote</span>
<span class="c1"># Spark Connect</span>
<span class="k">if</span> <span class="n">is_remote</span><span class="p">():</span>
<span class="n">session</span> <span class="o">=</span> <span class="n">f</span>
<span class="k">assert</span> <span class="n">session</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">,</span> <span class="s2">&quot;Spark Connect session must be provided.&quot;</span>
<span class="k">def</span> <span class="nf">outer</span><span class="p">(</span><span class="n">ff</span><span class="p">:</span> <span class="n">Callable</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Callable</span><span class="p">:</span>
<span class="n">session_client_thread_local_attrs</span> <span class="o">=</span> <span class="p">[</span>
<span class="p">(</span><span class="n">attr</span><span class="p">,</span> <span class="n">copy</span><span class="o">.</span><span class="n">deepcopy</span><span class="p">(</span><span class="n">value</span><span class="p">))</span>
<span class="k">for</span> <span class="p">(</span>
<span class="n">attr</span><span class="p">,</span>
<span class="n">value</span><span class="p">,</span>
<span class="p">)</span> <span class="ow">in</span> <span class="n">session</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">thread_local</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">items</span><span class="p">()</span> <span class="c1"># type: ignore[union-attr]</span>
<span class="p">]</span>
<span class="nd">@functools</span><span class="o">.</span><span class="n">wraps</span><span class="p">(</span><span class="n">ff</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">inner</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="n">Any</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Any</span><span class="p">:</span>
<span class="c1"># Set thread locals in child thread.</span>
<span class="k">for</span> <span class="n">attr</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">session_client_thread_local_attrs</span><span class="p">:</span>
<span class="nb">setattr</span><span class="p">(</span><span class="n">session</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">thread_local</span><span class="p">,</span> <span class="n">attr</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span> <span class="c1"># type: ignore[union-attr]</span>
<span class="k">return</span> <span class="n">ff</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">return</span> <span class="n">inner</span>
<span class="k">return</span> <span class="n">outer</span>
<span class="c1"># Non Spark Connect</span>
<span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkContext</span>
<span class="kn">from</span> <span class="nn">py4j.clientserver</span> <span class="kn">import</span> <span class="n">ClientServer</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">SparkContext</span><span class="o">.</span><span class="n">_gateway</span><span class="p">,</span> <span class="n">ClientServer</span><span class="p">):</span>
<span class="c1"># Here&#39;s when the pinned-thread mode (PYSPARK_PIN_THREAD) is on.</span>
<span class="c1"># NOTICE the internal difference vs `InheritableThread`. `InheritableThread`</span>
<span class="c1"># copies local properties when the thread starts but `inheritable_thread_target`</span>
<span class="c1"># copies when the function is wrapped.</span>
<span class="k">assert</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">properties</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">getLocalProperties</span><span class="p">()</span><span class="o">.</span><span class="n">clone</span><span class="p">()</span>
<span class="k">assert</span> <span class="nb">callable</span><span class="p">(</span><span class="n">f</span><span class="p">)</span>
<span class="nd">@functools</span><span class="o">.</span><span class="n">wraps</span><span class="p">(</span><span class="n">f</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">wrapped</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="n">Any</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Any</span><span class="p">:</span>
<span class="c1"># Set local properties in child thread.</span>
<span class="k">assert</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">setLocalProperties</span><span class="p">(</span><span class="n">properties</span><span class="p">)</span>
<span class="k">return</span> <span class="n">f</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> <span class="c1"># type: ignore[misc, operator]</span>
<span class="k">return</span> <span class="n">wrapped</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">f</span> <span class="c1"># type: ignore[return-value]</span></div>
<span class="k">def</span> <span class="nf">handle_worker_exception</span><span class="p">(</span><span class="n">e</span><span class="p">:</span> <span class="ne">BaseException</span><span class="p">,</span> <span class="n">outfile</span><span class="p">:</span> <span class="n">IO</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Handles exception for Python worker which writes SpecialLengths.PYTHON_EXCEPTION_THROWN (-2)</span>
<span class="sd"> and exception traceback info to outfile. JVM could then read from the outfile and perform</span>
<span class="sd"> exception handling there.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">exc_info</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">os</span><span class="o">.</span><span class="n">environ</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">&quot;SPARK_SIMPLIFIED_TRACEBACK&quot;</span><span class="p">,</span> <span class="kc">False</span><span class="p">):</span>
<span class="n">tb</span> <span class="o">=</span> <span class="n">try_simplify_traceback</span><span class="p">(</span><span class="n">sys</span><span class="o">.</span><span class="n">exc_info</span><span class="p">()[</span><span class="o">-</span><span class="mi">1</span><span class="p">])</span> <span class="c1"># type: ignore[arg-type]</span>
<span class="k">if</span> <span class="n">tb</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">e</span><span class="o">.</span><span class="n">__cause__</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">exc_info</span> <span class="o">=</span> <span class="s2">&quot;&quot;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">traceback</span><span class="o">.</span><span class="n">format_exception</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="n">e</span><span class="p">),</span> <span class="n">e</span><span class="p">,</span> <span class="n">tb</span><span class="p">))</span>
<span class="k">if</span> <span class="n">exc_info</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">exc_info</span> <span class="o">=</span> <span class="n">traceback</span><span class="o">.</span><span class="n">format_exc</span><span class="p">()</span>
<span class="n">write_int</span><span class="p">(</span><span class="n">SpecialLengths</span><span class="o">.</span><span class="n">PYTHON_EXCEPTION_THROWN</span><span class="p">,</span> <span class="n">outfile</span><span class="p">)</span>
<span class="n">write_with_length</span><span class="p">(</span><span class="n">exc_info</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s2">&quot;utf-8&quot;</span><span class="p">),</span> <span class="n">outfile</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">IOError</span><span class="p">:</span>
<span class="c1"># JVM close the socket</span>
<span class="k">pass</span>
<span class="k">except</span> <span class="ne">BaseException</span><span class="p">:</span>
<span class="c1"># Write the error to stderr if it happened while serializing</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">&quot;PySpark worker failed with exception:&quot;</span><span class="p">,</span> <span class="n">file</span><span class="o">=</span><span class="n">sys</span><span class="o">.</span><span class="n">stderr</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="n">traceback</span><span class="o">.</span><span class="n">format_exc</span><span class="p">(),</span> <span class="n">file</span><span class="o">=</span><span class="n">sys</span><span class="o">.</span><span class="n">stderr</span><span class="p">)</span>
<div class="viewcode-block" id="InheritableThread"><a class="viewcode-back" href="../../reference/api/pyspark.InheritableThread.html#pyspark.InheritableThread">[docs]</a><span class="k">class</span> <span class="nc">InheritableThread</span><span class="p">(</span><span class="n">threading</span><span class="o">.</span><span class="n">Thread</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Thread that is recommended to be used in PySpark when the pinned thread mode is</span>
<span class="sd"> enabled. The wrapper function, before calling original thread target, it</span>
<span class="sd"> inherits the inheritable properties specific to JVM thread such as</span>
<span class="sd"> ``InheritableThreadLocal``, or thread local such as tags</span>
<span class="sd"> with Spark Connect.</span>
<span class="sd"> When the pinned thread mode is off, this works as :class:`threading.Thread`.</span>
<span class="sd"> .. versionadded:: 3.1.0</span>
<span class="sd"> .. versionchanged:: 3.5.0</span>
<span class="sd"> Supports Spark Connect.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is experimental.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">_props</span><span class="p">:</span> <span class="s2">&quot;JavaObject&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">target</span><span class="p">:</span> <span class="n">Callable</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="n">session</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="s2">&quot;SparkSession&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="n">Any</span>
<span class="p">):</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">is_remote</span>
<span class="c1"># Spark Connect</span>
<span class="k">if</span> <span class="n">is_remote</span><span class="p">():</span>
<span class="k">assert</span> <span class="n">session</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">,</span> <span class="s2">&quot;Spark Connect must be provided.&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_session</span> <span class="o">=</span> <span class="n">session</span>
<span class="k">def</span> <span class="nf">copy_local_properties</span><span class="p">(</span><span class="o">*</span><span class="n">a</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="o">**</span><span class="n">k</span><span class="p">:</span> <span class="n">Any</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Any</span><span class="p">:</span>
<span class="c1"># Set tags in child thread.</span>
<span class="k">assert</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s2">&quot;_tags&quot;</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">thread_local</span><span class="o">.</span><span class="n">tags</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_tags</span> <span class="c1"># type: ignore[union-attr, has-type]</span>
<span class="k">return</span> <span class="n">target</span><span class="p">(</span><span class="o">*</span><span class="n">a</span><span class="p">,</span> <span class="o">**</span><span class="n">k</span><span class="p">)</span>
<span class="nb">super</span><span class="p">(</span><span class="n">InheritableThread</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">target</span><span class="o">=</span><span class="n">copy_local_properties</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span> <span class="c1"># type: ignore[misc]</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># Non Spark Connect</span>
<span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkContext</span>
<span class="kn">from</span> <span class="nn">py4j.clientserver</span> <span class="kn">import</span> <span class="n">ClientServer</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">SparkContext</span><span class="o">.</span><span class="n">_gateway</span><span class="p">,</span> <span class="n">ClientServer</span><span class="p">):</span>
<span class="c1"># Here&#39;s when the pinned-thread mode (PYSPARK_PIN_THREAD) is on.</span>
<span class="k">def</span> <span class="nf">copy_local_properties</span><span class="p">(</span><span class="o">*</span><span class="n">a</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="o">**</span><span class="n">k</span><span class="p">:</span> <span class="n">Any</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Any</span><span class="p">:</span>
<span class="c1"># self._props is set before starting the thread to match the behavior with JVM.</span>
<span class="k">assert</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s2">&quot;_props&quot;</span><span class="p">)</span>
<span class="k">assert</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">setLocalProperties</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_props</span><span class="p">)</span>
<span class="k">return</span> <span class="n">target</span><span class="p">(</span><span class="o">*</span><span class="n">a</span><span class="p">,</span> <span class="o">**</span><span class="n">k</span><span class="p">)</span>
<span class="nb">super</span><span class="p">(</span><span class="n">InheritableThread</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">target</span><span class="o">=</span><span class="n">copy_local_properties</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span> <span class="c1"># type: ignore[misc]</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="nb">super</span><span class="p">(</span><span class="n">InheritableThread</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">target</span><span class="o">=</span><span class="n">target</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span> <span class="c1"># type: ignore[misc]</span>
<span class="p">)</span>
<span class="k">def</span> <span class="nf">start</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">is_remote</span>
<span class="k">if</span> <span class="n">is_remote</span><span class="p">():</span>
<span class="c1"># Spark Connect</span>
<span class="k">assert</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s2">&quot;_session&quot;</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_session</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">thread_local</span><span class="p">,</span> <span class="s2">&quot;tags&quot;</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_session</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">thread_local</span><span class="o">.</span><span class="n">tags</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_tags</span> <span class="o">=</span> <span class="nb">set</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_session</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">thread_local</span><span class="o">.</span><span class="n">tags</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># Non Spark Connect</span>
<span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkContext</span>
<span class="kn">from</span> <span class="nn">py4j.clientserver</span> <span class="kn">import</span> <span class="n">ClientServer</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">SparkContext</span><span class="o">.</span><span class="n">_gateway</span><span class="p">,</span> <span class="n">ClientServer</span><span class="p">):</span>
<span class="c1"># Here&#39;s when the pinned-thread mode (PYSPARK_PIN_THREAD) is on.</span>
<span class="c1"># Local property copy should happen in Thread.start to mimic JVM&#39;s behavior.</span>
<span class="k">assert</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_props</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">getLocalProperties</span><span class="p">()</span><span class="o">.</span><span class="n">clone</span><span class="p">()</span>
<span class="p">)</span>
<span class="k">return</span> <span class="nb">super</span><span class="p">(</span><span class="n">InheritableThread</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">start</span><span class="p">()</span></div>
<span class="k">class</span> <span class="nc">PythonEvalType</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Evaluation type of python rdd.</span>
<span class="sd"> These values are internal to PySpark.</span>
<span class="sd"> These values should match values in org.apache.spark.api.python.PythonEvalType.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">NON_UDF</span><span class="p">:</span> <span class="s2">&quot;NonUDFType&quot;</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">SQL_BATCHED_UDF</span><span class="p">:</span> <span class="s2">&quot;SQLBatchedUDFType&quot;</span> <span class="o">=</span> <span class="mi">100</span>
<span class="n">SQL_ARROW_BATCHED_UDF</span><span class="p">:</span> <span class="s2">&quot;SQLArrowBatchedUDFType&quot;</span> <span class="o">=</span> <span class="mi">101</span>
<span class="n">SQL_SCALAR_PANDAS_UDF</span><span class="p">:</span> <span class="s2">&quot;PandasScalarUDFType&quot;</span> <span class="o">=</span> <span class="mi">200</span>
<span class="n">SQL_GROUPED_MAP_PANDAS_UDF</span><span class="p">:</span> <span class="s2">&quot;PandasGroupedMapUDFType&quot;</span> <span class="o">=</span> <span class="mi">201</span>
<span class="n">SQL_GROUPED_AGG_PANDAS_UDF</span><span class="p">:</span> <span class="s2">&quot;PandasGroupedAggUDFType&quot;</span> <span class="o">=</span> <span class="mi">202</span>
<span class="n">SQL_WINDOW_AGG_PANDAS_UDF</span><span class="p">:</span> <span class="s2">&quot;PandasWindowAggUDFType&quot;</span> <span class="o">=</span> <span class="mi">203</span>
<span class="n">SQL_SCALAR_PANDAS_ITER_UDF</span><span class="p">:</span> <span class="s2">&quot;PandasScalarIterUDFType&quot;</span> <span class="o">=</span> <span class="mi">204</span>
<span class="n">SQL_MAP_PANDAS_ITER_UDF</span><span class="p">:</span> <span class="s2">&quot;PandasMapIterUDFType&quot;</span> <span class="o">=</span> <span class="mi">205</span>
<span class="n">SQL_COGROUPED_MAP_PANDAS_UDF</span><span class="p">:</span> <span class="s2">&quot;PandasCogroupedMapUDFType&quot;</span> <span class="o">=</span> <span class="mi">206</span>
<span class="n">SQL_MAP_ARROW_ITER_UDF</span><span class="p">:</span> <span class="s2">&quot;ArrowMapIterUDFType&quot;</span> <span class="o">=</span> <span class="mi">207</span>
<span class="n">SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE</span><span class="p">:</span> <span class="s2">&quot;PandasGroupedMapUDFWithStateType&quot;</span> <span class="o">=</span> <span class="mi">208</span>
<span class="n">SQL_GROUPED_MAP_ARROW_UDF</span><span class="p">:</span> <span class="s2">&quot;ArrowGroupedMapUDFType&quot;</span> <span class="o">=</span> <span class="mi">209</span>
<span class="n">SQL_COGROUPED_MAP_ARROW_UDF</span><span class="p">:</span> <span class="s2">&quot;ArrowCogroupedMapUDFType&quot;</span> <span class="o">=</span> <span class="mi">210</span>
<span class="n">SQL_TRANSFORM_WITH_STATE_PANDAS_UDF</span><span class="p">:</span> <span class="s2">&quot;PandasGroupedMapUDFTransformWithStateType&quot;</span> <span class="o">=</span> <span class="mi">211</span>
<span class="n">SQL_TABLE_UDF</span><span class="p">:</span> <span class="s2">&quot;SQLTableUDFType&quot;</span> <span class="o">=</span> <span class="mi">300</span>
<span class="n">SQL_ARROW_TABLE_UDF</span><span class="p">:</span> <span class="s2">&quot;SQLArrowTableUDFType&quot;</span> <span class="o">=</span> <span class="mi">301</span>
<span class="k">def</span> <span class="nf">_create_local_socket</span><span class="p">(</span><span class="n">sock_info</span><span class="p">:</span> <span class="s2">&quot;JavaArray&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;io.BufferedRWPair&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Create a local socket that can be used to load deserialized data from the JVM</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> sock_info : tuple</span>
<span class="sd"> Tuple containing port number and authentication secret for a local socket.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> sockfile file descriptor of the local socket</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">sockfile</span><span class="p">:</span> <span class="s2">&quot;io.BufferedRWPair&quot;</span>
<span class="n">sock</span><span class="p">:</span> <span class="s2">&quot;socket.socket&quot;</span>
<span class="n">port</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="n">sock_info</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">auth_secret</span><span class="p">:</span> <span class="nb">str</span> <span class="o">=</span> <span class="n">sock_info</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="n">sockfile</span><span class="p">,</span> <span class="n">sock</span> <span class="o">=</span> <span class="n">local_connect_and_auth</span><span class="p">(</span><span class="n">port</span><span class="p">,</span> <span class="n">auth_secret</span><span class="p">)</span>
<span class="c1"># The RDD materialization time is unpredictable, if we set a timeout for socket reading</span>
<span class="c1"># operation, it will very possibly fail. See SPARK-18281.</span>
<span class="n">sock</span><span class="o">.</span><span class="n">settimeout</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span>
<span class="k">return</span> <span class="n">sockfile</span>
<span class="k">def</span> <span class="nf">_load_from_socket</span><span class="p">(</span><span class="n">sock_info</span><span class="p">:</span> <span class="s2">&quot;JavaArray&quot;</span><span class="p">,</span> <span class="n">serializer</span><span class="p">:</span> <span class="s2">&quot;Serializer&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">Any</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Connect to a local socket described by sock_info and use the given serializer to yield data</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> sock_info : tuple</span>
<span class="sd"> Tuple containing port number and authentication secret for a local socket.</span>
<span class="sd"> serializer : class:`Serializer`</span>
<span class="sd"> The PySpark serializer to use</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> result of meth:`Serializer.load_stream`,</span>
<span class="sd"> usually a generator that yields deserialized data</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">sockfile</span> <span class="o">=</span> <span class="n">_create_local_socket</span><span class="p">(</span><span class="n">sock_info</span><span class="p">)</span>
<span class="c1"># The socket will be automatically closed when garbage-collected.</span>
<span class="k">return</span> <span class="n">serializer</span><span class="o">.</span><span class="n">load_stream</span><span class="p">(</span><span class="n">sockfile</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_local_iterator_from_socket</span><span class="p">(</span><span class="n">sock_info</span><span class="p">:</span> <span class="s2">&quot;JavaArray&quot;</span><span class="p">,</span> <span class="n">serializer</span><span class="p">:</span> <span class="s2">&quot;Serializer&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">Any</span><span class="p">]:</span>
<span class="k">class</span> <span class="nc">PyLocalIterable</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Create a synchronous local iterable over a socket&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">_sock_info</span><span class="p">:</span> <span class="s2">&quot;JavaArray&quot;</span><span class="p">,</span> <span class="n">_serializer</span><span class="p">:</span> <span class="s2">&quot;Serializer&quot;</span><span class="p">):</span>
<span class="n">port</span><span class="p">:</span> <span class="nb">int</span>
<span class="n">auth_secret</span><span class="p">:</span> <span class="nb">str</span>
<span class="n">jsocket_auth_server</span><span class="p">:</span> <span class="s2">&quot;JavaObject&quot;</span>
<span class="n">port</span><span class="p">,</span> <span class="n">auth_secret</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">jsocket_auth_server</span> <span class="o">=</span> <span class="n">_sock_info</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sockfile</span> <span class="o">=</span> <span class="n">_create_local_socket</span><span class="p">((</span><span class="n">port</span><span class="p">,</span> <span class="n">auth_secret</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_serializer</span> <span class="o">=</span> <span class="n">_serializer</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_read_iter</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">Any</span><span class="p">]</span> <span class="o">=</span> <span class="nb">iter</span><span class="p">([])</span> <span class="c1"># Initialize as empty iterator</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_read_status</span> <span class="o">=</span> <span class="mi">1</span>
<span class="k">def</span> <span class="fm">__iter__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">Any</span><span class="p">]:</span>
<span class="k">while</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_status</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="c1"># Request next partition data from Java</span>
<span class="n">write_int</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sockfile</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sockfile</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span>
<span class="c1"># If response is 1 then there is a partition to read, if 0 then fully consumed</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_read_status</span> <span class="o">=</span> <span class="n">read_int</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_sockfile</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_status</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="c1"># Load the partition data as a stream and read each item</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_read_iter</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_serializer</span><span class="o">.</span><span class="n">load_stream</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_sockfile</span><span class="p">)</span>
<span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_iter</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">item</span>
<span class="c1"># An error occurred, join serving thread and raise any exceptions from the JVM</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_status</span> <span class="o">==</span> <span class="o">-</span><span class="mi">1</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">jsocket_auth_server</span><span class="o">.</span><span class="n">getResult</span><span class="p">()</span>
<span class="k">def</span> <span class="fm">__del__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># If local iterator is not fully consumed,</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_status</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># Finish consuming partition data stream</span>
<span class="k">for</span> <span class="n">_</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_iter</span><span class="p">:</span>
<span class="k">pass</span>
<span class="c1"># Tell Java to stop sending data and close connection</span>
<span class="n">write_int</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sockfile</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sockfile</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span>
<span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span>
<span class="c1"># Ignore any errors, socket is automatically closed when garbage-collected</span>
<span class="k">pass</span>
<span class="k">return</span> <span class="nb">iter</span><span class="p">(</span><span class="n">PyLocalIterable</span><span class="p">(</span><span class="n">sock_info</span><span class="p">,</span> <span class="n">serializer</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">local_connect_and_auth</span><span class="p">(</span><span class="n">port</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">int</span><span class="p">]],</span> <span class="n">auth_secret</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Tuple</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Connect to local host, authenticate with it, and return a (sockfile,sock) for that connection.</span>
<span class="sd"> Handles IPV4 &amp; IPV6, does some error handling.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> port : str or int, optional</span>
<span class="sd"> auth_secret : str</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> tuple</span>
<span class="sd"> with (sockfile, sock)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">sock</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="c1"># Support for both IPv4 and IPv6.</span>
<span class="n">addr</span> <span class="o">=</span> <span class="s2">&quot;127.0.0.1&quot;</span>
<span class="k">if</span> <span class="n">os</span><span class="o">.</span><span class="n">environ</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">&quot;SPARK_PREFER_IPV6&quot;</span><span class="p">,</span> <span class="s2">&quot;false&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span> <span class="o">==</span> <span class="s2">&quot;true&quot;</span><span class="p">:</span>
<span class="n">addr</span> <span class="o">=</span> <span class="s2">&quot;::1&quot;</span>
<span class="k">for</span> <span class="n">res</span> <span class="ow">in</span> <span class="n">socket</span><span class="o">.</span><span class="n">getaddrinfo</span><span class="p">(</span><span class="n">addr</span><span class="p">,</span> <span class="n">port</span><span class="p">,</span> <span class="n">socket</span><span class="o">.</span><span class="n">AF_UNSPEC</span><span class="p">,</span> <span class="n">socket</span><span class="o">.</span><span class="n">SOCK_STREAM</span><span class="p">):</span>
<span class="n">af</span><span class="p">,</span> <span class="n">socktype</span><span class="p">,</span> <span class="n">proto</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">sa</span> <span class="o">=</span> <span class="n">res</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">sock</span> <span class="o">=</span> <span class="n">socket</span><span class="o">.</span><span class="n">socket</span><span class="p">(</span><span class="n">af</span><span class="p">,</span> <span class="n">socktype</span><span class="p">,</span> <span class="n">proto</span><span class="p">)</span>
<span class="n">sock</span><span class="o">.</span><span class="n">settimeout</span><span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">environ</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">&quot;SPARK_AUTH_SOCKET_TIMEOUT&quot;</span><span class="p">,</span> <span class="mi">15</span><span class="p">)))</span>
<span class="n">sock</span><span class="o">.</span><span class="n">connect</span><span class="p">(</span><span class="n">sa</span><span class="p">)</span>
<span class="n">sockfile</span> <span class="o">=</span> <span class="n">sock</span><span class="o">.</span><span class="n">makefile</span><span class="p">(</span><span class="s2">&quot;rwb&quot;</span><span class="p">,</span> <span class="nb">int</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">environ</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">&quot;SPARK_BUFFER_SIZE&quot;</span><span class="p">,</span> <span class="mi">65536</span><span class="p">)))</span>
<span class="n">_do_server_auth</span><span class="p">(</span><span class="n">sockfile</span><span class="p">,</span> <span class="n">auth_secret</span><span class="p">)</span>
<span class="k">return</span> <span class="p">(</span><span class="n">sockfile</span><span class="p">,</span> <span class="n">sock</span><span class="p">)</span>
<span class="k">except</span> <span class="n">socket</span><span class="o">.</span><span class="n">error</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">emsg</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
<span class="n">errors</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s2">&quot;tried to connect to </span><span class="si">%s</span><span class="s2">, but an error occurred: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="p">(</span><span class="n">sa</span><span class="p">,</span> <span class="n">emsg</span><span class="p">))</span>
<span class="k">if</span> <span class="n">sock</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">sock</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="n">sock</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">errorClass</span><span class="o">=</span><span class="s2">&quot;CANNOT_OPEN_SOCKET&quot;</span><span class="p">,</span>
<span class="n">messageParameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;errors&quot;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">errors</span><span class="p">),</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">def</span> <span class="nf">_do_server_auth</span><span class="p">(</span><span class="n">conn</span><span class="p">:</span> <span class="s2">&quot;io.IOBase&quot;</span><span class="p">,</span> <span class="n">auth_secret</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Performs the authentication protocol defined by the SocketAuthHelper class on the given</span>
<span class="sd"> file-like object &#39;conn&#39;.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">write_with_length</span><span class="p">(</span><span class="n">auth_secret</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s2">&quot;utf-8&quot;</span><span class="p">),</span> <span class="n">conn</span><span class="p">)</span>
<span class="n">conn</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span>
<span class="n">reply</span> <span class="o">=</span> <span class="n">UTF8Deserializer</span><span class="p">()</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">conn</span><span class="p">)</span>
<span class="k">if</span> <span class="n">reply</span> <span class="o">!=</span> <span class="s2">&quot;ok&quot;</span><span class="p">:</span>
<span class="n">conn</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">errorClass</span><span class="o">=</span><span class="s2">&quot;UNEXPECTED_RESPONSE_FROM_SERVER&quot;</span><span class="p">,</span>
<span class="n">messageParameters</span><span class="o">=</span><span class="p">{},</span>
<span class="p">)</span>
<span class="n">_is_remote_only</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="nf">is_remote_only</span><span class="p">()</span> <span class="o">-&gt;</span> <span class="nb">bool</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns if the current running environment is only for Spark Connect.</span>
<span class="sd"> If users install pyspark-connect alone, RDD API does not exist.</span>
<span class="sd"> .. versionadded:: 4.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This will only return ``True`` if installed PySpark is only for Spark Connect.</span>
<span class="sd"> Otherwise, it returns ``False``.</span>
<span class="sd"> This API is unstable, and for developers.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> bool</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql import is_remote</span>
<span class="sd"> &gt;&gt;&gt; is_remote()</span>
<span class="sd"> False</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">global</span> <span class="n">_is_remote_only</span>
<span class="k">if</span> <span class="s2">&quot;SPARK_SKIP_CONNECT_COMPAT_TESTS&quot;</span> <span class="ow">in</span> <span class="n">os</span><span class="o">.</span><span class="n">environ</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">if</span> <span class="n">_is_remote_only</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">_is_remote_only</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">core</span> <span class="c1"># noqa: F401</span>
<span class="n">_is_remote_only</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">return</span> <span class="n">_is_remote_only</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="n">_is_remote_only</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">return</span> <span class="n">_is_remote_only</span>
<span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">&quot;__main__&quot;</span><span class="p">:</span>
<span class="k">if</span> <span class="s2">&quot;pypy&quot;</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">platform</span><span class="o">.</span><span class="n">python_implementation</span><span class="p">()</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span> <span class="ow">and</span> <span class="n">sys</span><span class="o">.</span><span class="n">version_info</span><span class="p">[:</span><span class="mi">2</span><span class="p">]</span> <span class="o">&gt;=</span> <span class="p">(</span><span class="mi">3</span><span class="p">,</span> <span class="mi">9</span><span class="p">):</span>
<span class="kn">import</span> <span class="nn">doctest</span>
<span class="kn">import</span> <span class="nn">pyspark.util</span>
<span class="kn">from</span> <span class="nn">pyspark.core.context</span> <span class="kn">import</span> <span class="n">SparkContext</span>
<span class="n">globs</span> <span class="o">=</span> <span class="n">pyspark</span><span class="o">.</span><span class="n">util</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;sc&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="s2">&quot;local[4]&quot;</span><span class="p">,</span> <span class="s2">&quot;PythonTest&quot;</span><span class="p">)</span>
<span class="p">(</span><span class="n">failure_count</span><span class="p">,</span> <span class="n">test_count</span><span class="p">)</span> <span class="o">=</span> <span class="n">doctest</span><span class="o">.</span><span class="n">testmod</span><span class="p">(</span><span class="n">pyspark</span><span class="o">.</span><span class="n">util</span><span class="p">,</span> <span class="n">globs</span><span class="o">=</span><span class="n">globs</span><span class="p">)</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;sc&quot;</span><span class="p">]</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">if</span> <span class="n">failure_count</span><span class="p">:</span>
<span class="n">sys</span><span class="o">.</span><span class="n">exit</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">)</span>
</pre></div>
</article>
<footer class="bd-footer-article">
<div class="footer-article-items footer-article__inner">
<div class="footer-article-item"><!-- Previous / next buttons -->
<div class="prev-next-area">
</div></div>
</div>
</footer>
</div>
</div>
<footer class="bd-footer-content">
</footer>
</main>
</div>
</div>
<!-- Scripts loaded after <body> so the DOM is not blocked -->
<script src="../../_static/scripts/bootstrap.js?digest=e353d410970836974a52"></script>
<script src="../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52"></script>
<footer class="bd-footer">
<div class="bd-footer__inner bd-page-width">
<div class="footer-items__start">
<div class="footer-item"><p class="copyright">
Copyright @ 2024 The Apache Software Foundation, Licensed under the <a href="https://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
</p></div>
<div class="footer-item">
<p class="sphinx-version">
Created using <a href="https://www.sphinx-doc.org/">Sphinx</a> 4.5.0.
<br/>
</p>
</div>
</div>
<div class="footer-items__end">
<div class="footer-item"><p class="theme-version">
Built with the <a href="https://pydata-sphinx-theme.readthedocs.io/en/stable/index.html">PyData Sphinx Theme</a> 0.13.3.
</p></div>
</div>
</div>
</footer>
</body>
</html>