blob: 4e8778be04f773dfc9d4ea97bf3e387d739daf01 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>pyspark.taskcontext &#8212; PySpark 3.5.0 documentation</title>
<link href="../../_static/styles/theme.css?digest=1999514e3f237ded88cf" rel="stylesheet">
<link href="../../_static/styles/pydata-sphinx-theme.css?digest=1999514e3f237ded88cf" rel="stylesheet">
<link rel="stylesheet"
href="../../_static/vendor/fontawesome/5.13.0/css/all.min.css">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../_static/vendor/fontawesome/5.13.0/webfonts/fa-solid-900.woff2">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../_static/vendor/fontawesome/5.13.0/webfonts/fa-brands-400.woff2">
<link rel="stylesheet" href="../../_static/styles/pydata-sphinx-theme.css" type="text/css" />
<link rel="stylesheet" href="../../_static/pygments.css" type="text/css" />
<link rel="stylesheet" type="text/css" href="../../_static/css/pyspark.css" />
<link rel="preload" as="script" href="../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf">
<script id="documentation_options" data-url_root="../../" src="../../_static/documentation_options.js"></script>
<script src="../../_static/jquery.js"></script>
<script src="../../_static/underscore.js"></script>
<script src="../../_static/doctools.js"></script>
<script src="../../_static/language_data.js"></script>
<script src="../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script async="async" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/x-mathjax-config">MathJax.Hub.Config({"tex2jax": {"inlineMath": [["$", "$"], ["\\(", "\\)"]], "processEscapes": true, "ignoreClass": "document", "processClass": "math|output_area"}})</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/_modules/pyspark/taskcontext.html" />
<link rel="search" title="Search" href="../../search.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="None">
<!-- Google Analytics -->
</head>
<body data-spy="scroll" data-target="#bd-toc-nav" data-offset="80">
<div class="container-fluid" id="banner"></div>
<nav class="navbar navbar-light navbar-expand-lg bg-light fixed-top bd-navbar" id="navbar-main"><div class="container-xl">
<div id="navbar-start">
<a class="navbar-brand" href="../../index.html">
<img src="../../_static/spark-logo-reverse.png" class="logo" alt="logo">
</a>
</div>
<button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbar-collapsible" aria-controls="navbar-collapsible" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div id="navbar-collapsible" class="col-lg-9 collapse navbar-collapse">
<div id="navbar-center" class="mr-auto">
<div class="navbar-center-item">
<ul id="navbar-main-elements" class="navbar-nav">
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../index.html">
Overview
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../user_guide/index.html">
User Guides
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../reference/index.html">
API Reference
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../development/index.html">
Development
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</div>
</div>
<div id="navbar-end">
<div class="navbar-end-item">
<ul id="navbar-icon-links" class="navbar-nav" aria-label="Icon Links">
</ul>
</div>
</div>
</div>
</div>
</nav>
<div class="container-xl">
<div class="row">
<!-- Only show if we have sidebars configured, else just a small margin -->
<div class="col-12 col-md-3 bd-sidebar">
<div class="sidebar-start-items"><form class="bd-search d-flex align-items-center" action="../../search.html" method="get">
<i class="icon fas fa-search"></i>
<input type="search" class="form-control" name="q" id="search-input" placeholder="Search the docs ..." aria-label="Search the docs ..." autocomplete="off" >
</form><nav class="bd-links" id="bd-docs-nav" aria-label="Main navigation">
<div class="bd-toc-item active">
</div>
</nav>
</div>
<div class="sidebar-end-items">
</div>
</div>
<div class="d-none d-xl-block col-xl-2 bd-toc">
</div>
<main class="col-12 col-md-9 col-xl-7 py-md-5 pl-md-5 pr-md-4 bd-content" role="main">
<div>
<h1>Source code for pyspark.taskcontext</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">ClassVar</span><span class="p">,</span> <span class="n">Type</span><span class="p">,</span> <span class="n">Dict</span><span class="p">,</span> <span class="n">List</span><span class="p">,</span> <span class="n">Optional</span><span class="p">,</span> <span class="n">Union</span><span class="p">,</span> <span class="n">cast</span>
<span class="kn">from</span> <span class="nn">pyspark.java_gateway</span> <span class="kn">import</span> <span class="n">local_connect_and_auth</span>
<span class="kn">from</span> <span class="nn">pyspark.resource</span> <span class="kn">import</span> <span class="n">ResourceInformation</span>
<span class="kn">from</span> <span class="nn">pyspark.serializers</span> <span class="kn">import</span> <span class="n">read_int</span><span class="p">,</span> <span class="n">write_int</span><span class="p">,</span> <span class="n">write_with_length</span><span class="p">,</span> <span class="n">UTF8Deserializer</span>
<span class="kn">from</span> <span class="nn">pyspark.errors</span> <span class="kn">import</span> <span class="n">PySparkRuntimeError</span>
<div class="viewcode-block" id="TaskContext"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.html#pyspark.TaskContext">[docs]</a><span class="k">class</span> <span class="nc">TaskContext</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Contextual information about a task which can be read or mutated during</span>
<span class="sd"> execution. To access the TaskContext for a running task, use:</span>
<span class="sd"> :meth:`TaskContext.get`.</span>
<span class="sd"> .. versionadded:: 2.2.0</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; from pyspark import TaskContext</span>
<span class="sd"> Get a task context instance from :class:`RDD`.</span>
<span class="sd"> &gt;&gt;&gt; spark.sparkContext.setLocalProperty(&quot;key1&quot;, &quot;value&quot;)</span>
<span class="sd"> &gt;&gt;&gt; taskcontext = spark.sparkContext.parallelize([1]).map(lambda _: TaskContext.get()).first()</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskcontext.attemptNumber(), int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskcontext.partitionId(), int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskcontext.stageId(), int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskcontext.taskAttemptId(), int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; taskcontext.getLocalProperty(&quot;key1&quot;)</span>
<span class="sd"> &#39;value&#39;</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskcontext.cpus(), int)</span>
<span class="sd"> True</span>
<span class="sd"> Get a task context instance from a dataframe via Python UDF.</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql import Row</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.functions import udf</span>
<span class="sd"> &gt;&gt;&gt; @udf(&quot;STRUCT&lt;anum: INT, partid: INT, stageid: INT, taskaid: INT, prop: STRING, cpus: INT&gt;&quot;)</span>
<span class="sd"> ... def taskcontext_as_row():</span>
<span class="sd"> ... taskcontext = TaskContext.get()</span>
<span class="sd"> ... return Row(</span>
<span class="sd"> ... anum=taskcontext.attemptNumber(),</span>
<span class="sd"> ... partid=taskcontext.partitionId(),</span>
<span class="sd"> ... stageid=taskcontext.stageId(),</span>
<span class="sd"> ... taskaid=taskcontext.taskAttemptId(),</span>
<span class="sd"> ... prop=taskcontext.getLocalProperty(&quot;key2&quot;),</span>
<span class="sd"> ... cpus=taskcontext.cpus())</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; spark.sparkContext.setLocalProperty(&quot;key2&quot;, &quot;value&quot;)</span>
<span class="sd"> &gt;&gt;&gt; [(anum, partid, stageid, taskaid, prop, cpus)] = (</span>
<span class="sd"> ... spark.range(1).select(taskcontext_as_row()).first()</span>
<span class="sd"> ... )</span>
<span class="sd"> &gt;&gt;&gt; isinstance(anum, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(partid, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(stageid, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskaid, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; prop</span>
<span class="sd"> &#39;value&#39;</span>
<span class="sd"> &gt;&gt;&gt; isinstance(cpus, int)</span>
<span class="sd"> True</span>
<span class="sd"> Get a task context instance from a dataframe via Pandas UDF.</span>
<span class="sd"> &gt;&gt;&gt; import pandas as pd # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.sql.functions import pandas_udf</span>
<span class="sd"> &gt;&gt;&gt; @pandas_udf(&quot;STRUCT&lt;&quot;</span>
<span class="sd"> ... &quot;anum: INT, partid: INT, stageid: INT, taskaid: INT, prop: STRING, cpus: INT&gt;&quot;)</span>
<span class="sd"> ... def taskcontext_as_row(_):</span>
<span class="sd"> ... taskcontext = TaskContext.get()</span>
<span class="sd"> ... return pd.DataFrame({</span>
<span class="sd"> ... &quot;anum&quot;: [taskcontext.attemptNumber()],</span>
<span class="sd"> ... &quot;partid&quot;: [taskcontext.partitionId()],</span>
<span class="sd"> ... &quot;stageid&quot;: [taskcontext.stageId()],</span>
<span class="sd"> ... &quot;taskaid&quot;: [taskcontext.taskAttemptId()],</span>
<span class="sd"> ... &quot;prop&quot;: [taskcontext.getLocalProperty(&quot;key3&quot;)],</span>
<span class="sd"> ... &quot;cpus&quot;: [taskcontext.cpus()]</span>
<span class="sd"> ... }) # doctest: +SKIP</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; spark.sparkContext.setLocalProperty(&quot;key3&quot;, &quot;value&quot;) # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; [(anum, partid, stageid, taskaid, prop, cpus)] = (</span>
<span class="sd"> ... spark.range(1).select(taskcontext_as_row(&quot;id&quot;)).first()</span>
<span class="sd"> ... ) # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; isinstance(anum, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(partid, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(stageid, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; isinstance(taskaid, int)</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; prop</span>
<span class="sd"> &#39;value&#39;</span>
<span class="sd"> &gt;&gt;&gt; isinstance(cpus, int)</span>
<span class="sd"> True</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">_taskContext</span><span class="p">:</span> <span class="n">ClassVar</span><span class="p">[</span><span class="n">Optional</span><span class="p">[</span><span class="s2">&quot;TaskContext&quot;</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_attemptNumber</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_partitionId</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_stageId</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_taskAttemptId</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_localProperties</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_cpus</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_resources</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">ResourceInformation</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="fm">__new__</span><span class="p">(</span><span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;TaskContext&quot;</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;TaskContext&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Even if users construct :class:`TaskContext` instead of using get, give them the singleton.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">taskContext</span> <span class="o">=</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span>
<span class="k">if</span> <span class="n">taskContext</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">taskContext</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span> <span class="o">=</span> <span class="n">taskContext</span> <span class="o">=</span> <span class="nb">object</span><span class="o">.</span><span class="fm">__new__</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span>
<span class="k">return</span> <span class="n">taskContext</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_getOrCreate</span><span class="p">(</span><span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;TaskContext&quot;</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;TaskContext&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Internal function to get or create global :class:`TaskContext`.&quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span> <span class="o">=</span> <span class="n">TaskContext</span><span class="p">()</span>
<span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_setTaskContext</span><span class="p">(</span><span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;TaskContext&quot;</span><span class="p">],</span> <span class="n">taskContext</span><span class="p">:</span> <span class="s2">&quot;TaskContext&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span> <span class="o">=</span> <span class="n">taskContext</span>
<div class="viewcode-block" id="TaskContext.get"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.get.html#pyspark.TaskContext.get">[docs]</a> <span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">get</span><span class="p">(</span><span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;TaskContext&quot;</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="s2">&quot;TaskContext&quot;</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return the currently active :class:`TaskContext`. This can be called inside of</span>
<span class="sd"> user functions to access contextual information about running tasks.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> :class:`TaskContext`, optional</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> Must be called on the worker, not the driver. Returns ``None`` if not initialized.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span></div>
<div class="viewcode-block" id="TaskContext.stageId"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.stageId.html#pyspark.TaskContext.stageId">[docs]</a> <span class="k">def</span> <span class="nf">stageId</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">int</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> The ID of the stage that this task belong to.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> int</span>
<span class="sd"> current stage id.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_stageId</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskContext.partitionId"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.partitionId.html#pyspark.TaskContext.partitionId">[docs]</a> <span class="k">def</span> <span class="nf">partitionId</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">int</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> The ID of the RDD partition that is computed by this task.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> int</span>
<span class="sd"> current partition id.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_partitionId</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskContext.attemptNumber"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.attemptNumber.html#pyspark.TaskContext.attemptNumber">[docs]</a> <span class="k">def</span> <span class="nf">attemptNumber</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">int</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> How many times this task has been attempted. The first task attempt will be assigned</span>
<span class="sd"> attemptNumber = 0, and subsequent attempts will have increasing attempt numbers.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> int</span>
<span class="sd"> current attempt number.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_attemptNumber</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskContext.taskAttemptId"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.taskAttemptId.html#pyspark.TaskContext.taskAttemptId">[docs]</a> <span class="k">def</span> <span class="nf">taskAttemptId</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">int</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> An ID that is unique to this task attempt (within the same :class:`SparkContext`,</span>
<span class="sd"> no two task attempts will share the same attempt ID). This is roughly equivalent</span>
<span class="sd"> to Hadoop&#39;s `TaskAttemptID`.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> int</span>
<span class="sd"> current task attempt id.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_taskAttemptId</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskContext.getLocalProperty"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.getLocalProperty.html#pyspark.TaskContext.getLocalProperty">[docs]</a> <span class="k">def</span> <span class="nf">getLocalProperty</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Get a local property set upstream in the driver, or None if it is missing.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> key : str</span>
<span class="sd"> the key of the local property to get.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> int</span>
<span class="sd"> the value of the local property.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">],</span> <span class="bp">self</span><span class="o">.</span><span class="n">_localProperties</span><span class="p">)</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskContext.cpus"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.cpus.html#pyspark.TaskContext.cpus">[docs]</a> <span class="k">def</span> <span class="nf">cpus</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">int</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> CPUs allocated to the task.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> int</span>
<span class="sd"> the number of CPUs.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_cpus</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskContext.resources"><a class="viewcode-back" href="../../reference/api/pyspark.TaskContext.resources.html#pyspark.TaskContext.resources">[docs]</a> <span class="k">def</span> <span class="nf">resources</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">ResourceInformation</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Resources allocated to the task. The key is the resource name and the value is information</span>
<span class="sd"> about the resource.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> dict</span>
<span class="sd"> a dictionary of a string resource name, and :class:`ResourceInformation`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">cast</span><span class="p">(</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">ResourceInformation</span><span class="p">],</span> <span class="bp">self</span><span class="o">.</span><span class="n">_resources</span><span class="p">)</span></div></div>
<span class="n">BARRIER_FUNCTION</span> <span class="o">=</span> <span class="mi">1</span>
<span class="n">ALL_GATHER_FUNCTION</span> <span class="o">=</span> <span class="mi">2</span>
<span class="k">def</span> <span class="nf">_load_from_socket</span><span class="p">(</span>
<span class="n">port</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">int</span><span class="p">]],</span>
<span class="n">auth_secret</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">function</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">all_gather_message</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Load data from a given socket, this is a blocking method thus only return when the socket</span>
<span class="sd"> connection has been closed.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="p">(</span><span class="n">sockfile</span><span class="p">,</span> <span class="n">sock</span><span class="p">)</span> <span class="o">=</span> <span class="n">local_connect_and_auth</span><span class="p">(</span><span class="n">port</span><span class="p">,</span> <span class="n">auth_secret</span><span class="p">)</span>
<span class="c1"># The call may block forever, so no timeout</span>
<span class="n">sock</span><span class="o">.</span><span class="n">settimeout</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span>
<span class="k">if</span> <span class="n">function</span> <span class="o">==</span> <span class="n">BARRIER_FUNCTION</span><span class="p">:</span>
<span class="c1"># Make a barrier() function call.</span>
<span class="n">write_int</span><span class="p">(</span><span class="n">function</span><span class="p">,</span> <span class="n">sockfile</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">function</span> <span class="o">==</span> <span class="n">ALL_GATHER_FUNCTION</span><span class="p">:</span>
<span class="c1"># Make a all_gather() function call.</span>
<span class="n">write_int</span><span class="p">(</span><span class="n">function</span><span class="p">,</span> <span class="n">sockfile</span><span class="p">)</span>
<span class="n">write_with_length</span><span class="p">(</span><span class="n">cast</span><span class="p">(</span><span class="nb">str</span><span class="p">,</span> <span class="n">all_gather_message</span><span class="p">)</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s2">&quot;utf-8&quot;</span><span class="p">),</span> <span class="n">sockfile</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Unrecognized function type&quot;</span><span class="p">)</span>
<span class="n">sockfile</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span>
<span class="c1"># Collect result.</span>
<span class="nb">len</span> <span class="o">=</span> <span class="n">read_int</span><span class="p">(</span><span class="n">sockfile</span><span class="p">)</span>
<span class="n">res</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="nb">len</span><span class="p">):</span>
<span class="n">res</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">UTF8Deserializer</span><span class="p">()</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">sockfile</span><span class="p">))</span>
<span class="c1"># Release resources.</span>
<span class="n">sockfile</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="n">sock</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="k">return</span> <span class="n">res</span>
<div class="viewcode-block" id="BarrierTaskContext"><a class="viewcode-back" href="../../reference/api/pyspark.BarrierTaskContext.html#pyspark.BarrierTaskContext">[docs]</a><span class="k">class</span> <span class="nc">BarrierTaskContext</span><span class="p">(</span><span class="n">TaskContext</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A :class:`TaskContext` with extra contextual info and tooling for tasks in a barrier stage.</span>
<span class="sd"> Use :func:`BarrierTaskContext.get` to obtain the barrier context for a running barrier task.</span>
<span class="sd"> .. versionadded:: 2.4.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is experimental</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> Set a barrier, and execute it with RDD.</span>
<span class="sd"> &gt;&gt;&gt; from pyspark import BarrierTaskContext</span>
<span class="sd"> &gt;&gt;&gt; def block_and_do_something(itr):</span>
<span class="sd"> ... taskcontext = BarrierTaskContext.get()</span>
<span class="sd"> ... # Do something.</span>
<span class="sd"> ...</span>
<span class="sd"> ... # Wait until all tasks finished.</span>
<span class="sd"> ... taskcontext.barrier()</span>
<span class="sd"> ...</span>
<span class="sd"> ... return itr</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; rdd = spark.sparkContext.parallelize([1])</span>
<span class="sd"> &gt;&gt;&gt; rdd.barrier().mapPartitions(block_and_do_something).collect()</span>
<span class="sd"> [1]</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">_port</span><span class="p">:</span> <span class="n">ClassVar</span><span class="p">[</span><span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">int</span><span class="p">]]]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">_secret</span><span class="p">:</span> <span class="n">ClassVar</span><span class="p">[</span><span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_getOrCreate</span><span class="p">(</span><span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Internal function to get or create global :class:`BarrierTaskContext`. We need to make sure</span>
<span class="sd"> :class:`BarrierTaskContext` is returned from here because it is needed in python worker</span>
<span class="sd"> reuse scenario, see SPARK-25921 for more details.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span><span class="p">,</span> <span class="n">BarrierTaskContext</span><span class="p">):</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span> <span class="o">=</span> <span class="nb">object</span><span class="o">.</span><span class="fm">__new__</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span>
<div class="viewcode-block" id="BarrierTaskContext.get"><a class="viewcode-back" href="../../reference/api/pyspark.BarrierTaskContext.get.html#pyspark.BarrierTaskContext.get">[docs]</a> <span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">get</span><span class="p">(</span><span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return the currently active :class:`BarrierTaskContext`.</span>
<span class="sd"> This can be called inside of user functions to access contextual information about</span>
<span class="sd"> running tasks.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> Must be called on the worker, not the driver. Returns ``None`` if not initialized.</span>
<span class="sd"> An Exception will raise if it is not in a barrier stage.</span>
<span class="sd"> This API is experimental</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span><span class="p">,</span> <span class="n">BarrierTaskContext</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;NOT_IN_BARRIER_STAGE&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{},</span>
<span class="p">)</span>
<span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_taskContext</span></div>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_initialize</span><span class="p">(</span>
<span class="bp">cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">],</span> <span class="n">port</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">int</span><span class="p">]],</span> <span class="n">secret</span><span class="p">:</span> <span class="nb">str</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Initialize :class:`BarrierTaskContext`, other methods within :class:`BarrierTaskContext`</span>
<span class="sd"> can only be called after BarrierTaskContext is initialized.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_port</span> <span class="o">=</span> <span class="n">port</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_secret</span> <span class="o">=</span> <span class="n">secret</span>
<div class="viewcode-block" id="BarrierTaskContext.barrier"><a class="viewcode-back" href="../../reference/api/pyspark.BarrierTaskContext.barrier.html#pyspark.BarrierTaskContext.barrier">[docs]</a> <span class="k">def</span> <span class="nf">barrier</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Sets a global barrier and waits until all tasks in this stage hit this barrier.</span>
<span class="sd"> Similar to `MPI_Barrier` function in MPI, this function blocks until all tasks</span>
<span class="sd"> in the same stage have reached this routine.</span>
<span class="sd"> .. versionadded:: 2.4.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is experimental</span>
<span class="sd"> In a barrier stage, each task much have the same number of `barrier()`</span>
<span class="sd"> calls, in all possible code branches. Otherwise, you may get the job hanging</span>
<span class="sd"> or a `SparkException` after timeout.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_port</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">_secret</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;CALL_BEFORE_INITIALIZE&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;func_name&quot;</span><span class="p">:</span> <span class="s2">&quot;barrier&quot;</span><span class="p">,</span>
<span class="s2">&quot;object&quot;</span><span class="p">:</span> <span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">,</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">_load_from_socket</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_port</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_secret</span><span class="p">,</span> <span class="n">BARRIER_FUNCTION</span><span class="p">)</span></div>
<div class="viewcode-block" id="BarrierTaskContext.allGather"><a class="viewcode-back" href="../../reference/api/pyspark.BarrierTaskContext.allGather.html#pyspark.BarrierTaskContext.allGather">[docs]</a> <span class="k">def</span> <span class="nf">allGather</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">message</span><span class="p">:</span> <span class="nb">str</span> <span class="o">=</span> <span class="s2">&quot;&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> This function blocks until all tasks in the same stage have reached this routine.</span>
<span class="sd"> Each task passes in a message and returns with a list of all the messages passed in</span>
<span class="sd"> by each of those tasks.</span>
<span class="sd"> .. versionadded:: 3.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is experimental</span>
<span class="sd"> In a barrier stage, each task much have the same number of `barrier()`</span>
<span class="sd"> calls, in all possible code branches. Otherwise, you may get the job hanging</span>
<span class="sd"> or a `SparkException` after timeout.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">message</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;Argument `message` must be of type `str`&quot;</span><span class="p">)</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_port</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">_secret</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;CALL_BEFORE_INITIALIZE&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;func_name&quot;</span><span class="p">:</span> <span class="s2">&quot;allGather&quot;</span><span class="p">,</span>
<span class="s2">&quot;object&quot;</span><span class="p">:</span> <span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">,</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">_load_from_socket</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_port</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_secret</span><span class="p">,</span> <span class="n">ALL_GATHER_FUNCTION</span><span class="p">,</span> <span class="n">message</span><span class="p">)</span></div>
<div class="viewcode-block" id="BarrierTaskContext.getTaskInfos"><a class="viewcode-back" href="../../reference/api/pyspark.BarrierTaskContext.getTaskInfos.html#pyspark.BarrierTaskContext.getTaskInfos">[docs]</a> <span class="k">def</span> <span class="nf">getTaskInfos</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">List</span><span class="p">[</span><span class="s2">&quot;BarrierTaskInfo&quot;</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns :class:`BarrierTaskInfo` for all tasks in this barrier stage,</span>
<span class="sd"> ordered by partition ID.</span>
<span class="sd"> .. versionadded:: 2.4.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is experimental</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; from pyspark import BarrierTaskContext</span>
<span class="sd"> &gt;&gt;&gt; rdd = spark.sparkContext.parallelize([1])</span>
<span class="sd"> &gt;&gt;&gt; barrier_info = rdd.barrier().mapPartitions(</span>
<span class="sd"> ... lambda _: [BarrierTaskContext.get().getTaskInfos()]).collect()[0][0]</span>
<span class="sd"> &gt;&gt;&gt; barrier_info.address</span>
<span class="sd"> &#39;...:...&#39;</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_port</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">_secret</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;CALL_BEFORE_INITIALIZE&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;func_name&quot;</span><span class="p">:</span> <span class="s2">&quot;getTaskInfos&quot;</span><span class="p">,</span>
<span class="s2">&quot;object&quot;</span><span class="p">:</span> <span class="s2">&quot;BarrierTaskContext&quot;</span><span class="p">,</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">addresses</span> <span class="o">=</span> <span class="n">cast</span><span class="p">(</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">],</span> <span class="bp">self</span><span class="o">.</span><span class="n">_localProperties</span><span class="p">)</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">&quot;addresses&quot;</span><span class="p">,</span> <span class="s2">&quot;&quot;</span><span class="p">)</span>
<span class="k">return</span> <span class="p">[</span><span class="n">BarrierTaskInfo</span><span class="p">(</span><span class="n">h</span><span class="o">.</span><span class="n">strip</span><span class="p">())</span> <span class="k">for</span> <span class="n">h</span> <span class="ow">in</span> <span class="n">addresses</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s2">&quot;,&quot;</span><span class="p">)]</span></div></div>
<div class="viewcode-block" id="BarrierTaskInfo"><a class="viewcode-back" href="../../reference/api/pyspark.BarrierTaskInfo.html#pyspark.BarrierTaskInfo">[docs]</a><span class="k">class</span> <span class="nc">BarrierTaskInfo</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Carries all task infos of a barrier task.</span>
<span class="sd"> .. versionadded:: 2.4.0</span>
<span class="sd"> Attributes</span>
<span class="sd"> ----------</span>
<span class="sd"> address : str</span>
<span class="sd"> The IPv4 address (host:port) of the executor that the barrier task is running on</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is experimental</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">address</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">address</span></div>
<span class="k">def</span> <span class="nf">_test</span><span class="p">()</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">doctest</span>
<span class="kn">import</span> <span class="nn">sys</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span>
<span class="n">globs</span> <span class="o">=</span> <span class="nb">globals</span><span class="p">()</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;spark&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">SparkSession</span><span class="o">.</span><span class="n">builder</span><span class="o">.</span><span class="n">master</span><span class="p">(</span><span class="s2">&quot;local[2]&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">appName</span><span class="p">(</span><span class="s2">&quot;taskcontext tests&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>
<span class="p">)</span>
<span class="p">(</span><span class="n">failure_count</span><span class="p">,</span> <span class="n">test_count</span><span class="p">)</span> <span class="o">=</span> <span class="n">doctest</span><span class="o">.</span><span class="n">testmod</span><span class="p">(</span><span class="n">globs</span><span class="o">=</span><span class="n">globs</span><span class="p">,</span> <span class="n">optionflags</span><span class="o">=</span><span class="n">doctest</span><span class="o">.</span><span class="n">ELLIPSIS</span><span class="p">)</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;spark&quot;</span><span class="p">]</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">if</span> <span class="n">failure_count</span><span class="p">:</span>
<span class="n">sys</span><span class="o">.</span><span class="n">exit</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">)</span>
<span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">&quot;__main__&quot;</span><span class="p">:</span>
<span class="n">_test</span><span class="p">()</span>
</pre></div>
</div>
<!-- Previous / next buttons -->
<div class='prev-next-area'>
</div>
</main>
</div>
</div>
<script src="../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf"></script>
<footer class="footer mt-5 mt-md-0">
<div class="container">
<div class="footer-item">
<p class="copyright">
&copy; Copyright .<br>
</p>
</div>
<div class="footer-item">
<p class="sphinx-version">
Created using <a href="http://sphinx-doc.org/">Sphinx</a> 3.0.4.<br>
</p>
</div>
</div>
</footer>
</body>
</html>