blob: 534b1b48cbe4bc217ec5234c6d47cd7c4967e47b [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>pyspark.accumulators &#8212; PySpark 3.5.3 documentation</title>
<link href="../../_static/styles/theme.css?digest=1999514e3f237ded88cf" rel="stylesheet">
<link href="../../_static/styles/pydata-sphinx-theme.css?digest=1999514e3f237ded88cf" rel="stylesheet">
<link rel="stylesheet"
href="../../_static/vendor/fontawesome/5.13.0/css/all.min.css">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../_static/vendor/fontawesome/5.13.0/webfonts/fa-solid-900.woff2">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../_static/vendor/fontawesome/5.13.0/webfonts/fa-brands-400.woff2">
<link rel="stylesheet" href="../../_static/styles/pydata-sphinx-theme.css" type="text/css" />
<link rel="stylesheet" href="../../_static/pygments.css" type="text/css" />
<link rel="stylesheet" type="text/css" href="../../_static/copybutton.css" />
<link rel="stylesheet" type="text/css" href="../../_static/css/pyspark.css" />
<link rel="preload" as="script" href="../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf">
<script id="documentation_options" data-url_root="../../" src="../../_static/documentation_options.js"></script>
<script src="../../_static/jquery.js"></script>
<script src="../../_static/underscore.js"></script>
<script src="../../_static/doctools.js"></script>
<script src="../../_static/language_data.js"></script>
<script src="../../_static/clipboard.min.js"></script>
<script src="../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script async="async" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/x-mathjax-config">MathJax.Hub.Config({"tex2jax": {"inlineMath": [["$", "$"], ["\\(", "\\)"]], "processEscapes": true, "ignoreClass": "document", "processClass": "math|output_area"}})</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/_modules/pyspark/accumulators.html" />
<link rel="search" title="Search" href="../../search.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="None">
<!-- Google Analytics -->
</head>
<body data-spy="scroll" data-target="#bd-toc-nav" data-offset="80">
<div class="container-fluid" id="banner"></div>
<nav class="navbar navbar-light navbar-expand-lg bg-light fixed-top bd-navbar" id="navbar-main"><div class="container-xl">
<div id="navbar-start">
<a class="navbar-brand" href="../../index.html">
<img src="../../_static/spark-logo-reverse.png" class="logo" alt="logo">
</a>
</div>
<button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbar-collapsible" aria-controls="navbar-collapsible" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div id="navbar-collapsible" class="col-lg-9 collapse navbar-collapse">
<div id="navbar-center" class="mr-auto">
<div class="navbar-center-item">
<ul id="navbar-main-elements" class="navbar-nav">
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../index.html">
Overview
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../user_guide/index.html">
User Guides
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../reference/index.html">
API Reference
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../development/index.html">
Development
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</div>
</div>
<div id="navbar-end">
<div class="navbar-end-item">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
3.5.3
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/accumulators.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script>
</div>
</div>
</div>
</div>
</nav>
<div class="container-xl">
<div class="row">
<!-- Only show if we have sidebars configured, else just a small margin -->
<div class="col-12 col-md-3 bd-sidebar">
<div class="sidebar-start-items"><form class="bd-search d-flex align-items-center" action="../../search.html" method="get">
<i class="icon fas fa-search"></i>
<input type="search" class="form-control" name="q" id="search-input" placeholder="Search the docs ..." aria-label="Search the docs ..." autocomplete="off" >
</form><nav class="bd-links" id="bd-docs-nav" aria-label="Main navigation">
<div class="bd-toc-item active">
</div>
</nav>
</div>
<div class="sidebar-end-items">
</div>
</div>
<div class="d-none d-xl-block col-xl-2 bd-toc">
</div>
<main class="col-12 col-md-9 col-xl-7 py-md-5 pl-md-5 pr-md-4 bd-content" role="main">
<div>
<h1>Source code for pyspark.accumulators</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="kn">import</span> <span class="nn">sys</span>
<span class="kn">import</span> <span class="nn">select</span>
<span class="kn">import</span> <span class="nn">struct</span>
<span class="kn">import</span> <span class="nn">socketserver</span> <span class="k">as</span> <span class="nn">SocketServer</span>
<span class="kn">import</span> <span class="nn">threading</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Callable</span><span class="p">,</span> <span class="n">Dict</span><span class="p">,</span> <span class="n">Generic</span><span class="p">,</span> <span class="n">Tuple</span><span class="p">,</span> <span class="n">Type</span><span class="p">,</span> <span class="n">TYPE_CHECKING</span><span class="p">,</span> <span class="n">TypeVar</span><span class="p">,</span> <span class="n">Union</span>
<span class="kn">from</span> <span class="nn">pyspark.serializers</span> <span class="kn">import</span> <span class="n">read_int</span><span class="p">,</span> <span class="n">CPickleSerializer</span>
<span class="kn">from</span> <span class="nn">pyspark.errors</span> <span class="kn">import</span> <span class="n">PySparkRuntimeError</span>
<span class="k">if</span> <span class="n">TYPE_CHECKING</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark._typing</span> <span class="kn">import</span> <span class="n">SupportsIAdd</span> <span class="c1"># noqa: F401</span>
<span class="kn">import</span> <span class="nn">socketserver.BaseRequestHandler</span> <span class="c1"># type: ignore[import]</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s2">&quot;Accumulator&quot;</span><span class="p">,</span> <span class="s2">&quot;AccumulatorParam&quot;</span><span class="p">]</span>
<span class="n">T</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s2">&quot;T&quot;</span><span class="p">)</span>
<span class="n">U</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s2">&quot;U&quot;</span><span class="p">,</span> <span class="n">bound</span><span class="o">=</span><span class="s2">&quot;SupportsIAdd&quot;</span><span class="p">)</span>
<span class="n">pickleSer</span> <span class="o">=</span> <span class="n">CPickleSerializer</span><span class="p">()</span>
<span class="c1"># Holds accumulators registered on the current machine, keyed by ID. This is then used to send</span>
<span class="c1"># the local accumulator updates back to the driver program at the end of a task.</span>
<span class="n">_accumulatorRegistry</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="s2">&quot;Accumulator&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">def</span> <span class="nf">_deserialize_accumulator</span><span class="p">(</span>
<span class="n">aid</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> <span class="n">zero_value</span><span class="p">:</span> <span class="n">T</span><span class="p">,</span> <span class="n">accum_param</span><span class="p">:</span> <span class="s2">&quot;AccumulatorParam[T]&quot;</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;Accumulator[T]&quot;</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark.accumulators</span> <span class="kn">import</span> <span class="n">_accumulatorRegistry</span>
<span class="c1"># If this certain accumulator was deserialized, don&#39;t overwrite it.</span>
<span class="k">if</span> <span class="n">aid</span> <span class="ow">in</span> <span class="n">_accumulatorRegistry</span><span class="p">:</span>
<span class="k">return</span> <span class="n">_accumulatorRegistry</span><span class="p">[</span><span class="n">aid</span><span class="p">]</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">accum</span> <span class="o">=</span> <span class="n">Accumulator</span><span class="p">(</span><span class="n">aid</span><span class="p">,</span> <span class="n">zero_value</span><span class="p">,</span> <span class="n">accum_param</span><span class="p">)</span>
<span class="n">accum</span><span class="o">.</span><span class="n">_deserialized</span> <span class="o">=</span> <span class="kc">True</span>
<span class="n">_accumulatorRegistry</span><span class="p">[</span><span class="n">aid</span><span class="p">]</span> <span class="o">=</span> <span class="n">accum</span>
<span class="k">return</span> <span class="n">accum</span>
<div class="viewcode-block" id="Accumulator"><a class="viewcode-back" href="../../reference/api/pyspark.Accumulator.html#pyspark.Accumulator">[docs]</a><span class="k">class</span> <span class="nc">Accumulator</span><span class="p">(</span><span class="n">Generic</span><span class="p">[</span><span class="n">T</span><span class="p">]):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A shared variable that can be accumulated, i.e., has a commutative and associative &quot;add&quot;</span>
<span class="sd"> operation. Worker tasks on a Spark cluster can add values to an Accumulator with the `+=`</span>
<span class="sd"> operator, but only the driver program is allowed to access its value, using `value`.</span>
<span class="sd"> Updates from the workers get propagated automatically to the driver program.</span>
<span class="sd"> While :class:`SparkContext` supports accumulators for primitive data types like :class:`int` and</span>
<span class="sd"> :class:`float`, users can also define accumulators for custom types by providing a custom</span>
<span class="sd"> :py:class:`AccumulatorParam` object. Refer to its doctest for an example.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; a = sc.accumulator(1)</span>
<span class="sd"> &gt;&gt;&gt; a.value</span>
<span class="sd"> 1</span>
<span class="sd"> &gt;&gt;&gt; a.value = 2</span>
<span class="sd"> &gt;&gt;&gt; a.value</span>
<span class="sd"> 2</span>
<span class="sd"> &gt;&gt;&gt; a += 5</span>
<span class="sd"> &gt;&gt;&gt; a.value</span>
<span class="sd"> 7</span>
<span class="sd"> &gt;&gt;&gt; sc.accumulator(1.0).value</span>
<span class="sd"> 1.0</span>
<span class="sd"> &gt;&gt;&gt; sc.accumulator(1j).value</span>
<span class="sd"> 1j</span>
<span class="sd"> &gt;&gt;&gt; rdd = sc.parallelize([1,2,3])</span>
<span class="sd"> &gt;&gt;&gt; def f(x):</span>
<span class="sd"> ... global a</span>
<span class="sd"> ... a += x</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; rdd.foreach(f)</span>
<span class="sd"> &gt;&gt;&gt; a.value</span>
<span class="sd"> 13</span>
<span class="sd"> &gt;&gt;&gt; b = sc.accumulator(0)</span>
<span class="sd"> &gt;&gt;&gt; def g(x):</span>
<span class="sd"> ... b.add(x)</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; rdd.foreach(g)</span>
<span class="sd"> &gt;&gt;&gt; b.value</span>
<span class="sd"> 6</span>
<span class="sd"> &gt;&gt;&gt; rdd.map(lambda x: a.value).collect() # doctest: +IGNORE_EXCEPTION_DETAIL</span>
<span class="sd"> Traceback (most recent call last):</span>
<span class="sd"> ...</span>
<span class="sd"> Py4JJavaError: ...</span>
<span class="sd"> &gt;&gt;&gt; def h(x):</span>
<span class="sd"> ... global a</span>
<span class="sd"> ... a.value = 7</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; rdd.foreach(h) # doctest: +IGNORE_EXCEPTION_DETAIL</span>
<span class="sd"> Traceback (most recent call last):</span>
<span class="sd"> ...</span>
<span class="sd"> Py4JJavaError: ...</span>
<span class="sd"> &gt;&gt;&gt; sc.accumulator([1.0, 2.0, 3.0]) # doctest: +IGNORE_EXCEPTION_DETAIL</span>
<span class="sd"> Traceback (most recent call last):</span>
<span class="sd"> ...</span>
<span class="sd"> TypeError: ...</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">aid</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="n">T</span><span class="p">,</span> <span class="n">accum_param</span><span class="p">:</span> <span class="s2">&quot;AccumulatorParam[T]&quot;</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Create a new Accumulator with a given initial value and AccumulatorParam object&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">pyspark.accumulators</span> <span class="kn">import</span> <span class="n">_accumulatorRegistry</span>
<span class="bp">self</span><span class="o">.</span><span class="n">aid</span> <span class="o">=</span> <span class="n">aid</span>
<span class="bp">self</span><span class="o">.</span><span class="n">accum_param</span> <span class="o">=</span> <span class="n">accum_param</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_value</span> <span class="o">=</span> <span class="n">value</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_deserialized</span> <span class="o">=</span> <span class="kc">False</span>
<span class="n">_accumulatorRegistry</span><span class="p">[</span><span class="n">aid</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span>
<span class="k">def</span> <span class="nf">__reduce__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Tuple</span><span class="p">[</span>
<span class="n">Callable</span><span class="p">[[</span><span class="nb">int</span><span class="p">,</span> <span class="n">T</span><span class="p">,</span> <span class="s2">&quot;AccumulatorParam[T]&quot;</span><span class="p">],</span> <span class="s2">&quot;Accumulator[T]&quot;</span><span class="p">],</span>
<span class="n">Tuple</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="n">T</span><span class="p">,</span> <span class="s2">&quot;AccumulatorParam[T]&quot;</span><span class="p">],</span>
<span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Custom serialization; saves the zero value from our AccumulatorParam&quot;&quot;&quot;</span>
<span class="n">param</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">accum_param</span>
<span class="k">return</span> <span class="p">(</span><span class="n">_deserialize_accumulator</span><span class="p">,</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">aid</span><span class="p">,</span> <span class="n">param</span><span class="o">.</span><span class="n">zero</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_value</span><span class="p">),</span> <span class="n">param</span><span class="p">))</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">value</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">T</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Get the accumulator&#39;s value; only usable in driver program&quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_deserialized</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;VALUE_NOT_ACCESSIBLE&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;value&quot;</span><span class="p">:</span> <span class="s2">&quot;Accumulator.value&quot;</span><span class="p">,</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_value</span>
<span class="nd">@value</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">value</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="n">T</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Sets the accumulator&#39;s value; only usable in driver program&quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_deserialized</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;VALUE_NOT_ACCESSIBLE&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;value&quot;</span><span class="p">:</span> <span class="s2">&quot;Accumulator.value&quot;</span><span class="p">,</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_value</span> <span class="o">=</span> <span class="n">value</span>
<div class="viewcode-block" id="Accumulator.add"><a class="viewcode-back" href="../../reference/api/pyspark.Accumulator.add.html#pyspark.Accumulator.add">[docs]</a> <span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">term</span><span class="p">:</span> <span class="n">T</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Adds a term to this accumulator&#39;s value&quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_value</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">accum_param</span><span class="o">.</span><span class="n">addInPlace</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_value</span><span class="p">,</span> <span class="n">term</span><span class="p">)</span></div>
<span class="k">def</span> <span class="fm">__iadd__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">term</span><span class="p">:</span> <span class="n">T</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;Accumulator[T]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;The += operator; adds a term to this accumulator&#39;s value&quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">term</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span>
<span class="k">def</span> <span class="fm">__str__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">str</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_value</span><span class="p">)</span>
<span class="k">def</span> <span class="fm">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">str</span><span class="p">:</span>
<span class="k">return</span> <span class="s2">&quot;Accumulator&lt;id=</span><span class="si">%i</span><span class="s2">, value=</span><span class="si">%s</span><span class="s2">&gt;&quot;</span> <span class="o">%</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">aid</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_value</span><span class="p">)</span></div>
<div class="viewcode-block" id="AccumulatorParam"><a class="viewcode-back" href="../../reference/api/pyspark.AccumulatorParam.html#pyspark.AccumulatorParam">[docs]</a><span class="k">class</span> <span class="nc">AccumulatorParam</span><span class="p">(</span><span class="n">Generic</span><span class="p">[</span><span class="n">T</span><span class="p">]):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Helper object that defines how to accumulate values of a given type.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.accumulators import AccumulatorParam</span>
<span class="sd"> &gt;&gt;&gt; class VectorAccumulatorParam(AccumulatorParam):</span>
<span class="sd"> ... def zero(self, value):</span>
<span class="sd"> ... return [0.0] * len(value)</span>
<span class="sd"> ... def addInPlace(self, val1, val2):</span>
<span class="sd"> ... for i in range(len(val1)):</span>
<span class="sd"> ... val1[i] += val2[i]</span>
<span class="sd"> ... return val1</span>
<span class="sd"> &gt;&gt;&gt; va = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam())</span>
<span class="sd"> &gt;&gt;&gt; va.value</span>
<span class="sd"> [1.0, 2.0, 3.0]</span>
<span class="sd"> &gt;&gt;&gt; def g(x):</span>
<span class="sd"> ... global va</span>
<span class="sd"> ... va += [x] * 3</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; rdd = sc.parallelize([1,2,3])</span>
<span class="sd"> &gt;&gt;&gt; rdd.foreach(g)</span>
<span class="sd"> &gt;&gt;&gt; va.value</span>
<span class="sd"> [7.0, 8.0, 9.0]</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="AccumulatorParam.zero"><a class="viewcode-back" href="../../reference/api/pyspark.AccumulatorParam.zero.html#pyspark.AccumulatorParam.zero">[docs]</a> <span class="k">def</span> <span class="nf">zero</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="n">T</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">T</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Provide a &quot;zero value&quot; for the type, compatible in dimensions with the</span>
<span class="sd"> provided `value` (e.g., a zero vector)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<div class="viewcode-block" id="AccumulatorParam.addInPlace"><a class="viewcode-back" href="../../reference/api/pyspark.AccumulatorParam.addInPlace.html#pyspark.AccumulatorParam.addInPlace">[docs]</a> <span class="k">def</span> <span class="nf">addInPlace</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value1</span><span class="p">:</span> <span class="n">T</span><span class="p">,</span> <span class="n">value2</span><span class="p">:</span> <span class="n">T</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">T</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Add two values of the accumulator&#39;s data type, returning a new value;</span>
<span class="sd"> for efficiency, can also update `value1` in place and return it.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div>
<span class="k">class</span> <span class="nc">AddingAccumulatorParam</span><span class="p">(</span><span class="n">AccumulatorParam</span><span class="p">[</span><span class="n">U</span><span class="p">]):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> An AccumulatorParam that uses the + operators to add values. Designed for simple types</span>
<span class="sd"> such as integers, floats, and lists. Requires the zero value for the underlying type</span>
<span class="sd"> as a parameter.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">zero_value</span><span class="p">:</span> <span class="n">U</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">zero_value</span> <span class="o">=</span> <span class="n">zero_value</span>
<span class="k">def</span> <span class="nf">zero</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="n">U</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">U</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">zero_value</span>
<span class="k">def</span> <span class="nf">addInPlace</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value1</span><span class="p">:</span> <span class="n">U</span><span class="p">,</span> <span class="n">value2</span><span class="p">:</span> <span class="n">U</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">U</span><span class="p">:</span>
<span class="n">value1</span> <span class="o">+=</span> <span class="n">value2</span> <span class="c1"># type: ignore[operator]</span>
<span class="k">return</span> <span class="n">value1</span>
<span class="c1"># Singleton accumulator params for some standard types</span>
<span class="n">INT_ACCUMULATOR_PARAM</span> <span class="o">=</span> <span class="n">AddingAccumulatorParam</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span> <span class="c1"># type: ignore[type-var]</span>
<span class="n">FLOAT_ACCUMULATOR_PARAM</span> <span class="o">=</span> <span class="n">AddingAccumulatorParam</span><span class="p">(</span><span class="mf">0.0</span><span class="p">)</span> <span class="c1"># type: ignore[type-var]</span>
<span class="n">COMPLEX_ACCUMULATOR_PARAM</span> <span class="o">=</span> <span class="n">AddingAccumulatorParam</span><span class="p">(</span><span class="mf">0.0</span><span class="n">j</span><span class="p">)</span> <span class="c1"># type: ignore[type-var]</span>
<span class="k">class</span> <span class="nc">_UpdateRequestHandler</span><span class="p">(</span><span class="n">SocketServer</span><span class="o">.</span><span class="n">StreamRequestHandler</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> This handler will keep polling updates from the same socket until the</span>
<span class="sd"> server is shutdown.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">handle</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark.accumulators</span> <span class="kn">import</span> <span class="n">_accumulatorRegistry</span>
<span class="n">auth_token</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">server</span><span class="o">.</span><span class="n">auth_token</span> <span class="c1"># type: ignore[attr-defined]</span>
<span class="k">def</span> <span class="nf">poll</span><span class="p">(</span><span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[],</span> <span class="nb">bool</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">while</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">server</span><span class="o">.</span><span class="n">server_shutdown</span><span class="p">:</span> <span class="c1"># type: ignore[attr-defined]</span>
<span class="c1"># Poll every 1 second for new data -- don&#39;t block in case of shutdown.</span>
<span class="n">r</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">select</span><span class="o">.</span><span class="n">select</span><span class="p">([</span><span class="bp">self</span><span class="o">.</span><span class="n">rfile</span><span class="p">],</span> <span class="p">[],</span> <span class="p">[],</span> <span class="mi">1</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">rfile</span> <span class="ow">in</span> <span class="n">r</span> <span class="ow">and</span> <span class="n">func</span><span class="p">():</span>
<span class="k">break</span>
<span class="k">def</span> <span class="nf">accum_updates</span><span class="p">()</span> <span class="o">-&gt;</span> <span class="nb">bool</span><span class="p">:</span>
<span class="n">num_updates</span> <span class="o">=</span> <span class="n">read_int</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">rfile</span><span class="p">)</span>
<span class="k">for</span> <span class="n">_</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">num_updates</span><span class="p">):</span>
<span class="p">(</span><span class="n">aid</span><span class="p">,</span> <span class="n">update</span><span class="p">)</span> <span class="o">=</span> <span class="n">pickleSer</span><span class="o">.</span><span class="n">_read_with_length</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">rfile</span><span class="p">)</span>
<span class="n">_accumulatorRegistry</span><span class="p">[</span><span class="n">aid</span><span class="p">]</span> <span class="o">+=</span> <span class="n">update</span>
<span class="c1"># Write a byte in acknowledgement</span>
<span class="bp">self</span><span class="o">.</span><span class="n">wfile</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">struct</span><span class="o">.</span><span class="n">pack</span><span class="p">(</span><span class="s2">&quot;!b&quot;</span><span class="p">,</span> <span class="mi">1</span><span class="p">))</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">def</span> <span class="nf">authenticate_and_accum_updates</span><span class="p">()</span> <span class="o">-&gt;</span> <span class="nb">bool</span><span class="p">:</span>
<span class="n">received_token</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">bytes</span><span class="p">,</span> <span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">rfile</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="nb">len</span><span class="p">(</span><span class="n">auth_token</span><span class="p">))</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">received_token</span><span class="p">,</span> <span class="nb">bytes</span><span class="p">):</span>
<span class="n">received_token</span> <span class="o">=</span> <span class="n">received_token</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s2">&quot;utf-8&quot;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">received_token</span> <span class="o">==</span> <span class="n">auth_token</span><span class="p">:</span>
<span class="n">accum_updates</span><span class="p">()</span>
<span class="c1"># we&#39;ve authenticated, we can break out of the first loop now</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;The value of the provided token to the AccumulatorServer is not correct.&quot;</span>
<span class="p">)</span>
<span class="c1"># first we keep polling till we&#39;ve received the authentication token</span>
<span class="n">poll</span><span class="p">(</span><span class="n">authenticate_and_accum_updates</span><span class="p">)</span>
<span class="c1"># now we&#39;ve authenticated, don&#39;t need to check for the token anymore</span>
<span class="n">poll</span><span class="p">(</span><span class="n">accum_updates</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">AccumulatorServer</span><span class="p">(</span><span class="n">SocketServer</span><span class="o">.</span><span class="n">TCPServer</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">server_address</span><span class="p">:</span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">int</span><span class="p">],</span>
<span class="n">RequestHandlerClass</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="s2">&quot;socketserver.BaseRequestHandler&quot;</span><span class="p">],</span>
<span class="n">auth_token</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="p">):</span>
<span class="n">SocketServer</span><span class="o">.</span><span class="n">TCPServer</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">server_address</span><span class="p">,</span> <span class="n">RequestHandlerClass</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">auth_token</span> <span class="o">=</span> <span class="n">auth_token</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A simple TCP server that intercepts shutdown() in order to interrupt</span>
<span class="sd"> our continuous polling on the handler.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">server_shutdown</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">def</span> <span class="nf">shutdown</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">server_shutdown</span> <span class="o">=</span> <span class="kc">True</span>
<span class="n">SocketServer</span><span class="o">.</span><span class="n">TCPServer</span><span class="o">.</span><span class="n">shutdown</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">server_close</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">_start_update_server</span><span class="p">(</span><span class="n">auth_token</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">AccumulatorServer</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Start a TCP server to receive accumulator updates in a daemon thread, and returns it&quot;&quot;&quot;</span>
<span class="n">server</span> <span class="o">=</span> <span class="n">AccumulatorServer</span><span class="p">((</span><span class="s2">&quot;localhost&quot;</span><span class="p">,</span> <span class="mi">0</span><span class="p">),</span> <span class="n">_UpdateRequestHandler</span><span class="p">,</span> <span class="n">auth_token</span><span class="p">)</span>
<span class="n">thread</span> <span class="o">=</span> <span class="n">threading</span><span class="o">.</span><span class="n">Thread</span><span class="p">(</span><span class="n">target</span><span class="o">=</span><span class="n">server</span><span class="o">.</span><span class="n">serve_forever</span><span class="p">)</span>
<span class="n">thread</span><span class="o">.</span><span class="n">daemon</span> <span class="o">=</span> <span class="kc">True</span>
<span class="n">thread</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
<span class="k">return</span> <span class="n">server</span>
<span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">&quot;__main__&quot;</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">doctest</span>
<span class="kn">from</span> <span class="nn">pyspark.context</span> <span class="kn">import</span> <span class="n">SparkContext</span>
<span class="n">globs</span> <span class="o">=</span> <span class="nb">globals</span><span class="p">()</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="c1"># The small batch size here ensures that we see multiple batches,</span>
<span class="c1"># even in these small test examples:</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;sc&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="s2">&quot;local&quot;</span><span class="p">,</span> <span class="s2">&quot;test&quot;</span><span class="p">)</span>
<span class="p">(</span><span class="n">failure_count</span><span class="p">,</span> <span class="n">test_count</span><span class="p">)</span> <span class="o">=</span> <span class="n">doctest</span><span class="o">.</span><span class="n">testmod</span><span class="p">(</span><span class="n">globs</span><span class="o">=</span><span class="n">globs</span><span class="p">,</span> <span class="n">optionflags</span><span class="o">=</span><span class="n">doctest</span><span class="o">.</span><span class="n">ELLIPSIS</span><span class="p">)</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;sc&quot;</span><span class="p">]</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">if</span> <span class="n">failure_count</span><span class="p">:</span>
<span class="n">sys</span><span class="o">.</span><span class="n">exit</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">)</span>
</pre></div>
</div>
<!-- Previous / next buttons -->
<div class='prev-next-area'>
</div>
</main>
</div>
</div>
<script src="../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf"></script>
<footer class="footer mt-5 mt-md-0">
<div class="container">
<div class="footer-item">
<p class="copyright">
&copy; Copyright .<br>
</p>
</div>
<div class="footer-item">
<p class="sphinx-version">
Created using <a href="http://sphinx-doc.org/">Sphinx</a> 3.0.4.<br>
</p>
</div>
</div>
</footer>
</body>
</html>