blob: 2aa803e99aab445504f87b04977bd8e5ffacc573 [file] [log] [blame]
<!DOCTYPE html>
<html >
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>pyspark.taskcontext &#8212; PySpark 4.0.0-preview2 documentation</title>
<script data-cfasync="false">
document.documentElement.dataset.mode = localStorage.getItem("mode") || "";
document.documentElement.dataset.theme = localStorage.getItem("theme") || "light";
</script>
<!-- Loaded before other Sphinx assets -->
<link href="../../_static/styles/theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../_static/styles/bootstrap.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../_static/styles/pydata-sphinx-theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../_static/vendor/fontawesome/6.1.2/css/all.min.css?digest=e353d410970836974a52" rel="stylesheet" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../_static/vendor/fontawesome/6.1.2/webfonts/fa-solid-900.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../_static/vendor/fontawesome/6.1.2/webfonts/fa-brands-400.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../_static/vendor/fontawesome/6.1.2/webfonts/fa-regular-400.woff2" />
<link rel="stylesheet" type="text/css" href="../../_static/pygments.css" />
<link rel="stylesheet" type="text/css" href="../../_static/copybutton.css" />
<link rel="stylesheet" type="text/css" href="../../_static/css/pyspark.css" />
<!-- Pre-loaded scripts that we'll load fully later -->
<link rel="preload" as="script" href="../../_static/scripts/bootstrap.js?digest=e353d410970836974a52" />
<link rel="preload" as="script" href="../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52" />
<script data-url_root="../../" id="documentation_options" src="../../_static/documentation_options.js"></script>
<script src="../../_static/jquery.js"></script>
<script src="../../_static/underscore.js"></script>
<script src="../../_static/doctools.js"></script>
<script src="../../_static/clipboard.min.js"></script>
<script src="../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script>DOCUMENTATION_OPTIONS.pagename = '_modules/pyspark/taskcontext';</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/_modules/pyspark/taskcontext.html" />
<link rel="search" title="Search" href="../../search.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="None">
<!-- Matomo -->
<script type="text/javascript">
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body data-bs-spy="scroll" data-bs-target=".bd-toc-nav" data-offset="180" data-bs-root-margin="0px 0px -60%" data-default-mode="">
<a class="skip-link" href="#main-content">Skip to main content</a>
<input type="checkbox"
class="sidebar-toggle"
name="__primary"
id="__primary"/>
<label class="overlay overlay-primary" for="__primary"></label>
<input type="checkbox"
class="sidebar-toggle"
name="__secondary"
id="__secondary"/>
<label class="overlay overlay-secondary" for="__secondary"></label>
<div class="search-button__wrapper">
<div class="search-button__overlay"></div>
<div class="search-button__search-container">
<form class="bd-search d-flex align-items-center"
action="../../search.html"
method="get">
<i class="fa-solid fa-magnifying-glass"></i>
<input type="search"
class="form-control"
name="q"
id="search-input"
placeholder="Search the docs ..."
aria-label="Search the docs ..."
autocomplete="off"
autocorrect="off"
autocapitalize="off"
spellcheck="false"/>
<span class="search-button__kbd-shortcut"><kbd class="kbd-shortcut__modifier">Ctrl</kbd>+<kbd>K</kbd></span>
</form></div>
</div>
<nav class="bd-header navbar navbar-expand-lg bd-navbar">
<div class="bd-header__inner bd-page-width">
<label class="sidebar-toggle primary-toggle" for="__primary">
<span class="fa-solid fa-bars"></span>
</label>
<div class="navbar-header-items__start">
<div class="navbar-item">
<a class="navbar-brand logo" href="../../index.html">
<img src="https://spark.apache.org/images/spark-logo.png" class="logo__image only-light" alt="Logo image"/>
<script>document.write(`<img src="https://spark.apache.org/images/spark-logo-rev.svg" class="logo__image only-dark" alt="Logo image"/>`);</script>
</a></div>
</div>
<div class="col-lg-9 navbar-header-items">
<div class="me-auto navbar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../user_guide/index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="navbar-header-items__end">
<div class="navbar-item navbar-persistent--container">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview2
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/taskcontext.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="navbar-persistent--mobile">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
</div>
</nav>
<div class="bd-container">
<div class="bd-container__inner bd-page-width">
<div class="bd-sidebar-primary bd-sidebar hide-on-wide">
<div class="sidebar-header-items sidebar-primary__section">
<div class="sidebar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../user_guide/index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="sidebar-header-items__end">
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview2
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/taskcontext.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="sidebar-primary-items__end sidebar-primary__section">
</div>
<div id="rtd-footer-container"></div>
</div>
<main id="main-content" class="bd-main">
<div class="bd-content">
<div class="bd-article-container">
<div class="bd-header-article">
<div class="header-article-items header-article__inner">
<div class="header-article-items__start">
<div class="header-article-item">
<nav aria-label="Breadcrumbs">
<ul class="bd-breadcrumbs" role="navigation" aria-label="Breadcrumb">
<li class="breadcrumb-item breadcrumb-home">
<a href="../../index.html" class="nav-link" aria-label="Home">
<i class="fa-solid fa-home"></i>
</a>
</li>
<li class="breadcrumb-item"><a href="../index.html" class="nav-link">Module code</a></li>
<li class="breadcrumb-item active" aria-current="page">pyspark.taskcontext</li>
</ul>
</nav>
</div>
</div>
</div>
</div>
<div id="searchbox"></div>
<article class="bd-article" role="main">
<h1>Source code for pyspark.taskcontext</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">ClassVar</span><span class="p">,</span> <span class="n">Type</span><span class="p">,</span> <span class="n">Dict</span><span class="p">,</span> <span class="n">List</span><span class="p">,</span> <span class="n">Optional</span><span class="p">,</span> <span class="n">Union</span><span class="p">,</span> <span class="n">cast</span><span class="p">,</span> <span class="n">TYPE_CHECKING</span>
<span class="kn">from</span> <span class="nn">pyspark.util</span> <span class="kn">import</span> <span class="n">local_connect_and_auth</span>
<span class="kn">from</span> <span class="nn">pyspark.serializers</span> <span class="kn">import</span> <span class="n">read_int</span><span class="p">,</span> <span class="n">write_int</span><span class="p">,</span> <span class="n">write_with_length</span><span class="p">,</span> <span class="n">UTF8Deserializer</span>
<span class="kn">from</span> <span class="nn">pyspark.errors</span> <span class="kn">import</span> <span class="n">PySparkRuntimeError</span>
<span class="k">if</span> <span class="n">TYPE_CHECKING</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark.resource</span> <span class="kn">import</span> <span class="n">ResourceInformation</span>
<div class="viewcode-block" id="TaskContext"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.html#pyspark.TaskContext">[docs]</a><span class="k">class</span> <span class="nc">TaskContext</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Contextual information about a task which can be read or mutated during</span>
<span class="sd"> execution. To access the TaskContext for a running task, use:</span>
<span class="sd"> :meth:`TaskContext.get`.</span>
<span class="sd"> .. versionadded:: 2.2.0</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; from pyspark import TaskContext</span>
<span class="sd"> Get a task context instance from :class:`RDD`.</span>
<span class="sd"> &gt;&gt;&gt; spark.sparkContext.setLocalProperty(&quot;key1&quot;, &quot;value&quot;)</span>
<span class="sd"> &gt;&gt;&gt; taskcontext = spark.sparkContext.parallelize([1]).map(lambda _: TaskContext.get()).first()</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskcontext.attemptNumber(), int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskcontext.partitionId(), int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskcontext.stageId(), int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskcontext.taskAttemptId(), int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; taskcontext.getLocalProperty(&quot;key1&quot;)</span>
<span class="sd"> &#39;value&#39;</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskcontext.cpus(), int)</span>
<span class="sd"> True</span>
<span class="sd"> Get a task context instance from a dataframe via Python UDF.</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql import Row</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.functions import udf</span>
<span class="sd"> &gt;&gt;&gt; @udf(&quot;STRUCT&lt;anum: INT, partid: INT, stageid: INT, taskaid: INT, prop: STRING, cpus: INT&gt;&quot;)</span>
<span class="sd"> ... def taskcontext_as_row():</span>
<span class="sd"> ... taskcontext = TaskContext.get()</span>
<span class="sd"> ... return Row(</span>
<span class="sd"> ... anum=taskcontext.attemptNumber(),</span>
<span class="sd"> ... partid=taskcontext.partitionId(),</span>
<span class="sd"> ... stageid=taskcontext.stageId(),</span>
<span class="sd"> ... taskaid=taskcontext.taskAttemptId(),</span>
<span class="sd"> ... prop=taskcontext.getLocalProperty(&quot;key2&quot;),</span>
<span class="sd"> ... cpus=taskcontext.cpus())</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; spark.sparkContext.setLocalProperty(&quot;key2&quot;, &quot;value&quot;)</span>
<span class="sd"> &gt;&gt;&gt; [(anum, partid, stageid, taskaid, prop, cpus)] = (</span>
<span class="sd"> ... spark.range(1).select(taskcontext_as_row()).first()</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; isinstance(anum, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(partid, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(stageid, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskaid, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; prop</span>
<span class="sd"> &#39;value&#39;</span>
<span class="sd"> &gt;&gt;&gt; isinstance(cpus, int)</span>
<span class="sd"> True</span>
<span class="sd"> Get a task context instance from a dataframe via Pandas UDF.</span>
<span class="sd"> &gt;&gt;&gt; import pandas as pd # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.functions import pandas_udf</span>
<span class="sd"> &gt;&gt;&gt; @pandas_udf(&quot;STRUCT&lt;&quot;</span>
<span class="sd"> ... &quot;anum: INT, partid: INT, stageid: INT, taskaid: INT, prop: STRING, cpus: INT&gt;&quot;)</span>
<span class="sd"> ... def taskcontext_as_row(_):</span>
<span class="sd"> ... taskcontext = TaskContext.get()</span>
<span class="sd"> ... return pd.DataFrame({</span>
<span class="sd"> ... &quot;anum&quot;: [taskcontext.attemptNumber()],</span>
<span class="sd"> ... &quot;partid&quot;: [taskcontext.partitionId()],</span>
<span class="sd"> ... &quot;stageid&quot;: [taskcontext.stageId()],</span>
<span class="sd"> ... &quot;taskaid&quot;: [taskcontext.taskAttemptId()],</span>
<span class="sd"> ... &quot;prop&quot;: [taskcontext.getLocalProperty(&quot;key3&quot;)],</span>
<span class="sd"> ... &quot;cpus&quot;: [taskcontext.cpus()]</span>
<span class="sd"> ... }) # doctest: +SKIP</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; spark.sparkContext.setLocalProperty(&quot;key3&quot;, &quot;value&quot;) # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; [(anum, partid, stageid, taskaid, prop, cpus)] = (</span>
<span class="sd"> ... spark.range(1).select(taskcontext_as_row(&quot;id&quot;)).first()</span>
<span class="sd"> ... ) # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; isinstance(anum, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(partid, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(stageid, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskaid, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; prop</span>
<span class="sd"> &#39;value&#39;</span>
<span class="sd"> &gt;&gt;&gt; isinstance(cpus, int)</span>
<span class="sd"> True</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">_taskContext</span><span class="p">:</span> <span class="n">ClassVar</span><span class="p">[</span><span class="n">Optional</span><span class="p">[</span><span class="s2">&quot;TaskContext&quot;</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_attemptNumber</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_partitionId</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_stageId</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_taskAttemptId</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_localProperties</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_cpus</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_resources</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="s2">&quot;ResourceInformation&quot;</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="fm">__new__</span><span class="p">(</span><span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;TaskContext&quot;</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;TaskContext&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Even if users construct :class:`TaskContext` instead of using get, give them the singleton.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">taskContext</span> <span class="o">=</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span>
<span class="k">if</span> <span class="n">taskContext</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">taskContext</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span> <span class="o">=</span> <span class="n">taskContext</span> <span class="o">=</span> <span class="nb">object</span><span class="o">.</span><span class="fm">__new__</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span>
<span class="k">return</span> <span class="n">taskContext</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_getOrCreate</span><span class="p">(</span><span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;TaskContext&quot;</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;TaskContext&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Internal function to get or create global :class:`TaskContext`.&quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span> <span class="o">=</span> <span class="n">TaskContext</span><span class="p">()</span>
<span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_setTaskContext</span><span class="p">(</span><span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;TaskContext&quot;</span><span class="p">],</span> <span class="n">taskContext</span><span class="p">:</span> <span class="s2">&quot;TaskContext&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span> <span class="o">=</span> <span class="n">taskContext</span>
<div class="viewcode-block" id="TaskContext.get"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.get.html#pyspark.TaskContext.get">[docs]</a> <span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">get</span><span class="p">(</span><span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;TaskContext&quot;</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="s2">&quot;TaskContext&quot;</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return the currently active :class:`TaskContext`. This can be called inside of</span>
<span class="sd"> user functions to access contextual information about running tasks.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> :class:`TaskContext`, optional</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> Must be called on the worker, not the driver. Returns ``None`` if not initialized.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span></div>
<div class="viewcode-block" id="TaskContext.stageId"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.stageId.html#pyspark.TaskContext.stageId">[docs]</a> <span class="k">def</span> <span class="nf">stageId</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">int</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> The ID of the stage that this task belong to.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> int</span>
<span class="sd"> current stage id.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_stageId</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskContext.partitionId"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.partitionId.html#pyspark.TaskContext.partitionId">[docs]</a> <span class="k">def</span> <span class="nf">partitionId</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">int</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> The ID of the RDD partition that is computed by this task.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> int</span>
<span class="sd"> current partition id.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_partitionId</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskContext.attemptNumber"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.attemptNumber.html#pyspark.TaskContext.attemptNumber">[docs]</a> <span class="k">def</span> <span class="nf">attemptNumber</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">int</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> How many times this task has been attempted. The first task attempt will be assigned</span>
<span class="sd"> attemptNumber = 0, and subsequent attempts will have increasing attempt numbers.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> int</span>
<span class="sd"> current attempt number.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_attemptNumber</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskContext.taskAttemptId"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.taskAttemptId.html#pyspark.TaskContext.taskAttemptId">[docs]</a> <span class="k">def</span> <span class="nf">taskAttemptId</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">int</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> An ID that is unique to this task attempt (within the same :class:`SparkContext`,</span>
<span class="sd"> no two task attempts will share the same attempt ID). This is roughly equivalent</span>
<span class="sd"> to Hadoop&#39;s `TaskAttemptID`.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> int</span>
<span class="sd"> current task attempt id.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_taskAttemptId</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskContext.getLocalProperty"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.getLocalProperty.html#pyspark.TaskContext.getLocalProperty">[docs]</a> <span class="k">def</span> <span class="nf">getLocalProperty</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Get a local property set upstream in the driver, or None if it is missing.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> key : str</span>
<span class="sd"> the key of the local property to get.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> int</span>
<span class="sd"> the value of the local property.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">],</span> <span class="bp">self</span><span class="o">.</span><span class="n">_localProperties</span><span class="p">)</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskContext.cpus"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.cpus.html#pyspark.TaskContext.cpus">[docs]</a> <span class="k">def</span> <span class="nf">cpus</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">int</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> CPUs allocated to the task.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> int</span>
<span class="sd"> the number of CPUs.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cpus</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskContext.resources"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.resources.html#pyspark.TaskContext.resources">[docs]</a> <span class="k">def</span> <span class="nf">resources</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="s2">&quot;ResourceInformation&quot;</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Resources allocated to the task. The key is the resource name and the value is information</span>
<span class="sd"> about the resource.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> dict</span>
<span class="sd"> a dictionary of a string resource name, and :class:`ResourceInformation`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">pyspark.resource</span> <span class="kn">import</span> <span class="n">ResourceInformation</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="s2">&quot;ResourceInformation&quot;</span><span class="p">],</span> <span class="bp">self</span><span class="o">.</span><span class="n">_resources</span><span class="p">)</span></div></div>
<span class="n">BARRIER_FUNCTION</span> <span class="o">=</span> <span class="mi">1</span>
<span class="n">ALL_GATHER_FUNCTION</span> <span class="o">=</span> <span class="mi">2</span>
<span class="k">def</span> <span class="nf">_load_from_socket</span><span class="p">(</span>
<span class="n">port</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">int</span><span class="p">]],</span>
<span class="n">auth_secret</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">function</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">all_gather_message</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Load data from a given socket, this is a blocking method thus only return when the socket</span>
<span class="sd"> connection has been closed.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="p">(</span><span class="n">sockfile</span><span class="p">,</span> <span class="n">sock</span><span class="p">)</span> <span class="o">=</span> <span class="n">local_connect_and_auth</span><span class="p">(</span><span class="n">port</span><span class="p">,</span> <span class="n">auth_secret</span><span class="p">)</span>
<span class="c1"># The call may block forever, so no timeout</span>
<span class="n">sock</span><span class="o">.</span><span class="n">settimeout</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span>
<span class="k">if</span> <span class="n">function</span> <span class="o">==</span> <span class="n">BARRIER_FUNCTION</span><span class="p">:</span>
<span class="c1"># Make a barrier() function call.</span>
<span class="n">write_int</span><span class="p">(</span><span class="n">function</span><span class="p">,</span> <span class="n">sockfile</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">function</span> <span class="o">==</span> <span class="n">ALL_GATHER_FUNCTION</span><span class="p">:</span>
<span class="c1"># Make a all_gather() function call.</span>
<span class="n">write_int</span><span class="p">(</span><span class="n">function</span><span class="p">,</span> <span class="n">sockfile</span><span class="p">)</span>
<span class="n">write_with_length</span><span class="p">(</span><span class="n">cast</span><span class="p">(</span><span class="nb">str</span><span class="p">,</span> <span class="n">all_gather_message</span><span class="p">)</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s2">&quot;utf-8&quot;</span><span class="p">),</span> <span class="n">sockfile</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Unrecognized function type&quot;</span><span class="p">)</span>
<span class="n">sockfile</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span>
<span class="c1"># Collect result.</span>
<span class="nb">len</span> <span class="o">=</span> <span class="n">read_int</span><span class="p">(</span><span class="n">sockfile</span><span class="p">)</span>
<span class="n">res</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="nb">len</span><span class="p">):</span>
<span class="n">res</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">UTF8Deserializer</span><span class="p">()</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">sockfile</span><span class="p">))</span>
<span class="c1"># Release resources.</span>
<span class="n">sockfile</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="n">sock</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="k">return</span> <span class="n">res</span>
<div class="viewcode-block" id="BarrierTaskContext"><a class="viewcode-back" href="../../reference/api/pyspark.BarrierTaskContext.html#pyspark.BarrierTaskContext">[docs]</a><span class="k">class</span> <span class="nc">BarrierTaskContext</span><span class="p">(</span><span class="n">TaskContext</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A :class:`TaskContext` with extra contextual info and tooling for tasks in a barrier stage.</span>
<span class="sd"> Use :func:`BarrierTaskContext.get` to obtain the barrier context for a running barrier task.</span>
<span class="sd"> .. versionadded:: 2.4.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is experimental</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> Set a barrier, and execute it with RDD.</span>
<span class="sd"> &gt;&gt;&gt; from pyspark import BarrierTaskContext</span>
<span class="sd"> &gt;&gt;&gt; def block_and_do_something(itr):</span>
<span class="sd"> ... taskcontext = BarrierTaskContext.get()</span>
<span class="sd"> ... # Do something.</span>
<span class="sd"> ...</span>
<span class="sd"> ... # Wait until all tasks finished.</span>
<span class="sd"> ... taskcontext.barrier()</span>
<span class="sd"> ...</span>
<span class="sd"> ... return itr</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; rdd = spark.sparkContext.parallelize([1])</span>
<span class="sd"> &gt;&gt;&gt; rdd.barrier().mapPartitions(block_and_do_something).collect()</span>
<span class="sd"> [1]</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">_port</span><span class="p">:</span> <span class="n">ClassVar</span><span class="p">[</span><span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">int</span><span class="p">]]]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_secret</span><span class="p">:</span> <span class="n">ClassVar</span><span class="p">[</span><span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_getOrCreate</span><span class="p">(</span><span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Internal function to get or create global :class:`BarrierTaskContext`. We need to make sure</span>
<span class="sd"> :class:`BarrierTaskContext` is returned from here because it is needed in python worker</span>
<span class="sd"> reuse scenario, see SPARK-25921 for more details.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span><span class="p">,</span> <span class="n">BarrierTaskContext</span><span class="p">):</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span> <span class="o">=</span> <span class="nb">object</span><span class="o">.</span><span class="fm">__new__</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span>
<div class="viewcode-block" id="BarrierTaskContext.get"><a class="viewcode-back" href="../../reference/api/pyspark.BarrierTaskContext.get.html#pyspark.BarrierTaskContext.get">[docs]</a> <span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">get</span><span class="p">(</span><span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return the currently active :class:`BarrierTaskContext`.</span>
<span class="sd"> This can be called inside of user functions to access contextual information about</span>
<span class="sd"> running tasks.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> Must be called on the worker, not the driver. Returns ``None`` if not initialized.</span>
<span class="sd"> An Exception will raise if it is not in a barrier stage.</span>
<span class="sd"> This API is experimental</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span><span class="p">,</span> <span class="n">BarrierTaskContext</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">errorClass</span><span class="o">=</span><span class="s2">&quot;NOT_IN_BARRIER_STAGE&quot;</span><span class="p">,</span>
<span class="n">messageParameters</span><span class="o">=</span><span class="p">{},</span>
<span class="p">)</span>
<span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span></div>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_initialize</span><span class="p">(</span>
<span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">],</span> <span class="n">port</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">int</span><span class="p">]],</span> <span class="n">secret</span><span class="p">:</span> <span class="nb">str</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Initialize :class:`BarrierTaskContext`, other methods within :class:`BarrierTaskContext`</span>
<span class="sd"> can only be called after BarrierTaskContext is initialized.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_port</span> <span class="o">=</span> <span class="n">port</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_secret</span> <span class="o">=</span> <span class="n">secret</span>
<div class="viewcode-block" id="BarrierTaskContext.barrier"><a class="viewcode-back" href="../../reference/api/pyspark.BarrierTaskContext.barrier.html#pyspark.BarrierTaskContext.barrier">[docs]</a> <span class="k">def</span> <span class="nf">barrier</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Sets a global barrier and waits until all tasks in this stage hit this barrier.</span>
<span class="sd"> Similar to `MPI_Barrier` function in MPI, this function blocks until all tasks</span>
<span class="sd"> in the same stage have reached this routine.</span>
<span class="sd"> .. versionadded:: 2.4.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is experimental</span>
<span class="sd"> In a barrier stage, each task much have the same number of `barrier()`</span>
<span class="sd"> calls, in all possible code branches. Otherwise, you may get the job hanging</span>
<span class="sd"> or a `SparkException` after timeout.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_port</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">_secret</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">errorClass</span><span class="o">=</span><span class="s2">&quot;CALL_BEFORE_INITIALIZE&quot;</span><span class="p">,</span>
<span class="n">messageParameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;func_name&quot;</span><span class="p">:</span> <span class="s2">&quot;barrier&quot;</span><span class="p">,</span>
<span class="s2">&quot;object&quot;</span><span class="p">:</span> <span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">,</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">_load_from_socket</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_port</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_secret</span><span class="p">,</span> <span class="n">BARRIER_FUNCTION</span><span class="p">)</span></div>
<div class="viewcode-block" id="BarrierTaskContext.allGather"><a class="viewcode-back" href="../../reference/api/pyspark.BarrierTaskContext.allGather.html#pyspark.BarrierTaskContext.allGather">[docs]</a> <span class="k">def</span> <span class="nf">allGather</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">message</span><span class="p">:</span> <span class="nb">str</span> <span class="o">=</span> <span class="s2">&quot;&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> This function blocks until all tasks in the same stage have reached this routine.</span>
<span class="sd"> Each task passes in a message and returns with a list of all the messages passed in</span>
<span class="sd"> by each of those tasks.</span>
<span class="sd"> .. versionadded:: 3.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is experimental</span>
<span class="sd"> In a barrier stage, each task much have the same number of `barrier()`</span>
<span class="sd"> calls, in all possible code branches. Otherwise, you may get the job hanging</span>
<span class="sd"> or a `SparkException` after timeout.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">message</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;Argument `message` must be of type `str`&quot;</span><span class="p">)</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_port</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">_secret</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">errorClass</span><span class="o">=</span><span class="s2">&quot;CALL_BEFORE_INITIALIZE&quot;</span><span class="p">,</span>
<span class="n">messageParameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;func_name&quot;</span><span class="p">:</span> <span class="s2">&quot;allGather&quot;</span><span class="p">,</span>
<span class="s2">&quot;object&quot;</span><span class="p">:</span> <span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">,</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">_load_from_socket</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_port</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_secret</span><span class="p">,</span> <span class="n">ALL_GATHER_FUNCTION</span><span class="p">,</span> <span class="n">message</span><span class="p">)</span></div>
<div class="viewcode-block" id="BarrierTaskContext.getTaskInfos"><a class="viewcode-back" href="../../reference/api/pyspark.BarrierTaskContext.getTaskInfos.html#pyspark.BarrierTaskContext.getTaskInfos">[docs]</a> <span class="k">def</span> <span class="nf">getTaskInfos</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">List</span><span class="p">[</span><span class="s2">&quot;BarrierTaskInfo&quot;</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns :class:`BarrierTaskInfo` for all tasks in this barrier stage,</span>
<span class="sd"> ordered by partition ID.</span>
<span class="sd"> .. versionadded:: 2.4.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is experimental</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; from pyspark import BarrierTaskContext</span>
<span class="sd"> &gt;&gt;&gt; rdd = spark.sparkContext.parallelize([1])</span>
<span class="sd"> &gt;&gt;&gt; barrier_info = rdd.barrier().mapPartitions(</span>
<span class="sd"> ... lambda _: [BarrierTaskContext.get().getTaskInfos()]).collect()[0][0]</span>
<span class="sd"> &gt;&gt;&gt; barrier_info.address</span>
<span class="sd"> &#39;...:...&#39;</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_port</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">_secret</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">errorClass</span><span class="o">=</span><span class="s2">&quot;CALL_BEFORE_INITIALIZE&quot;</span><span class="p">,</span>
<span class="n">messageParameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;func_name&quot;</span><span class="p">:</span> <span class="s2">&quot;getTaskInfos&quot;</span><span class="p">,</span>
<span class="s2">&quot;object&quot;</span><span class="p">:</span> <span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">,</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">addresses</span> <span class="o">=</span> <span class="n">cast</span><span class="p">(</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">],</span> <span class="bp">self</span><span class="o">.</span><span class="n">_localProperties</span><span class="p">)</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">&quot;addresses&quot;</span><span class="p">,</span> <span class="s2">&quot;&quot;</span><span class="p">)</span>
<span class="k">return</span> <span class="p">[</span><span class="n">BarrierTaskInfo</span><span class="p">(</span><span class="n">h</span><span class="o">.</span><span class="n">strip</span><span class="p">())</span> <span class="k">for</span> <span class="n">h</span> <span class="ow">in</span> <span class="n">addresses</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s2">&quot;,&quot;</span><span class="p">)]</span></div></div>
<div class="viewcode-block" id="BarrierTaskInfo"><a class="viewcode-back" href="../../reference/api/pyspark.BarrierTaskInfo.html#pyspark.BarrierTaskInfo">[docs]</a><span class="k">class</span> <span class="nc">BarrierTaskInfo</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Carries all task infos of a barrier task.</span>
<span class="sd"> .. versionadded:: 2.4.0</span>
<span class="sd"> Attributes</span>
<span class="sd"> ----------</span>
<span class="sd"> address : str</span>
<span class="sd"> The IPv4 address (host:port) of the executor that the barrier task is running on</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is experimental</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">address</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">address</span></div>
<span class="k">def</span> <span class="nf">_test</span><span class="p">()</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">doctest</span>
<span class="kn">import</span> <span class="nn">sys</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span>
<span class="n">globs</span> <span class="o">=</span> <span class="nb">globals</span><span class="p">()</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;spark&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">SparkSession</span><span class="o">.</span><span class="n">builder</span><span class="o">.</span><span class="n">master</span><span class="p">(</span><span class="s2">&quot;local[2]&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">appName</span><span class="p">(</span><span class="s2">&quot;taskcontext tests&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>
<span class="p">)</span>
<span class="p">(</span><span class="n">failure_count</span><span class="p">,</span> <span class="n">test_count</span><span class="p">)</span> <span class="o">=</span> <span class="n">doctest</span><span class="o">.</span><span class="n">testmod</span><span class="p">(</span><span class="n">globs</span><span class="o">=</span><span class="n">globs</span><span class="p">,</span> <span class="n">optionflags</span><span class="o">=</span><span class="n">doctest</span><span class="o">.</span><span class="n">ELLIPSIS</span><span class="p">)</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;spark&quot;</span><span class="p">]</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">if</span> <span class="n">failure_count</span><span class="p">:</span>
<span class="n">sys</span><span class="o">.</span><span class="n">exit</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">)</span>
<span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">&quot;__main__&quot;</span><span class="p">:</span>
<span class="n">_test</span><span class="p">()</span>
</pre></div>
</article>
<footer class="bd-footer-article">
<div class="footer-article-items footer-article__inner">
<div class="footer-article-item"><!-- Previous / next buttons -->
<div class="prev-next-area">
</div></div>
</div>
</footer>
</div>
</div>
<footer class="bd-footer-content">
</footer>
</main>
</div>
</div>
<!-- Scripts loaded after <body> so the DOM is not blocked -->
<script src="../../_static/scripts/bootstrap.js?digest=e353d410970836974a52"></script>
<script src="../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52"></script>
<footer class="bd-footer">
<div class="bd-footer__inner bd-page-width">
<div class="footer-items__start">
<div class="footer-item"><p class="copyright">
Copyright @ 2024 The Apache Software Foundation, Licensed under the <a href="https://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
</p></div>
<div class="footer-item">
<p class="sphinx-version">
Created using <a href="https://www.sphinx-doc.org/">Sphinx</a> 4.5.0.
<br/>
</p>
</div>
</div>
<div class="footer-items__end">
<div class="footer-item"><p class="theme-version">
Built with the <a href="https://pydata-sphinx-theme.readthedocs.io/en/stable/index.html">PyData Sphinx Theme</a> 0.13.3.
</p></div>
</div>
</div>
</footer>
</body>
</html>