blob: 9e258d9807aace8370ee13b75025839ccabfaede [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>pyspark.broadcast &#8212; PySpark 3.5.3 documentation</title>
<link href="../../_static/styles/theme.css?digest=1999514e3f237ded88cf" rel="stylesheet">
<link href="../../_static/styles/pydata-sphinx-theme.css?digest=1999514e3f237ded88cf" rel="stylesheet">
<link rel="stylesheet"
href="../../_static/vendor/fontawesome/5.13.0/css/all.min.css">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../_static/vendor/fontawesome/5.13.0/webfonts/fa-solid-900.woff2">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../_static/vendor/fontawesome/5.13.0/webfonts/fa-brands-400.woff2">
<link rel="stylesheet" href="../../_static/styles/pydata-sphinx-theme.css" type="text/css" />
<link rel="stylesheet" href="../../_static/pygments.css" type="text/css" />
<link rel="stylesheet" type="text/css" href="../../_static/copybutton.css" />
<link rel="stylesheet" type="text/css" href="../../_static/css/pyspark.css" />
<link rel="preload" as="script" href="../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf">
<script id="documentation_options" data-url_root="../../" src="../../_static/documentation_options.js"></script>
<script src="../../_static/jquery.js"></script>
<script src="../../_static/underscore.js"></script>
<script src="../../_static/doctools.js"></script>
<script src="../../_static/language_data.js"></script>
<script src="../../_static/clipboard.min.js"></script>
<script src="../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script async="async" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/x-mathjax-config">MathJax.Hub.Config({"tex2jax": {"inlineMath": [["$", "$"], ["\\(", "\\)"]], "processEscapes": true, "ignoreClass": "document", "processClass": "math|output_area"}})</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/_modules/pyspark/broadcast.html" />
<link rel="search" title="Search" href="../../search.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="None">
<!-- Google Analytics -->
</head>
<body data-spy="scroll" data-target="#bd-toc-nav" data-offset="80">
<div class="container-fluid" id="banner"></div>
<nav class="navbar navbar-light navbar-expand-lg bg-light fixed-top bd-navbar" id="navbar-main"><div class="container-xl">
<div id="navbar-start">
<a class="navbar-brand" href="../../index.html">
<img src="../../_static/spark-logo-reverse.png" class="logo" alt="logo">
</a>
</div>
<button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbar-collapsible" aria-controls="navbar-collapsible" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div id="navbar-collapsible" class="col-lg-9 collapse navbar-collapse">
<div id="navbar-center" class="mr-auto">
<div class="navbar-center-item">
<ul id="navbar-main-elements" class="navbar-nav">
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../index.html">
Overview
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../user_guide/index.html">
User Guides
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../reference/index.html">
API Reference
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../development/index.html">
Development
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</div>
</div>
<div id="navbar-end">
<div class="navbar-end-item">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
3.5.3
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/broadcast.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script>
</div>
</div>
</div>
</div>
</nav>
<div class="container-xl">
<div class="row">
<!-- Only show if we have sidebars configured, else just a small margin -->
<div class="col-12 col-md-3 bd-sidebar">
<div class="sidebar-start-items"><form class="bd-search d-flex align-items-center" action="../../search.html" method="get">
<i class="icon fas fa-search"></i>
<input type="search" class="form-control" name="q" id="search-input" placeholder="Search the docs ..." aria-label="Search the docs ..." autocomplete="off" >
</form><nav class="bd-links" id="bd-docs-nav" aria-label="Main navigation">
<div class="bd-toc-item active">
</div>
</nav>
</div>
<div class="sidebar-end-items">
</div>
</div>
<div class="d-none d-xl-block col-xl-2 bd-toc">
</div>
<main class="col-12 col-md-9 col-xl-7 py-md-5 pl-md-5 pr-md-4 bd-content" role="main">
<div>
<h1>Source code for pyspark.broadcast</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="kn">import</span> <span class="nn">gc</span>
<span class="kn">import</span> <span class="nn">os</span>
<span class="kn">import</span> <span class="nn">sys</span>
<span class="kn">from</span> <span class="nn">tempfile</span> <span class="kn">import</span> <span class="n">NamedTemporaryFile</span>
<span class="kn">import</span> <span class="nn">threading</span>
<span class="kn">import</span> <span class="nn">pickle</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="p">(</span>
<span class="n">overload</span><span class="p">,</span>
<span class="n">Any</span><span class="p">,</span>
<span class="n">BinaryIO</span><span class="p">,</span>
<span class="n">Callable</span><span class="p">,</span>
<span class="n">Dict</span><span class="p">,</span>
<span class="n">Generic</span><span class="p">,</span>
<span class="n">IO</span><span class="p">,</span>
<span class="n">Iterator</span><span class="p">,</span>
<span class="n">Optional</span><span class="p">,</span>
<span class="n">Tuple</span><span class="p">,</span>
<span class="n">TypeVar</span><span class="p">,</span>
<span class="n">TYPE_CHECKING</span><span class="p">,</span>
<span class="n">Union</span><span class="p">,</span>
<span class="p">)</span>
<span class="kn">from</span> <span class="nn">pyspark.java_gateway</span> <span class="kn">import</span> <span class="n">local_connect_and_auth</span>
<span class="kn">from</span> <span class="nn">pyspark.serializers</span> <span class="kn">import</span> <span class="n">ChunkedStream</span><span class="p">,</span> <span class="n">pickle_protocol</span>
<span class="kn">from</span> <span class="nn">pyspark.util</span> <span class="kn">import</span> <span class="n">print_exec</span>
<span class="kn">from</span> <span class="nn">pyspark.errors</span> <span class="kn">import</span> <span class="n">PySparkRuntimeError</span>
<span class="k">if</span> <span class="n">TYPE_CHECKING</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkContext</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s2">&quot;Broadcast&quot;</span><span class="p">]</span>
<span class="n">T</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s2">&quot;T&quot;</span><span class="p">)</span>
<span class="c1"># Holds broadcasted data received from Java, keyed by its id.</span>
<span class="n">_broadcastRegistry</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="s2">&quot;Broadcast[Any]&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">def</span> <span class="nf">_from_id</span><span class="p">(</span><span class="n">bid</span><span class="p">:</span> <span class="nb">int</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;Broadcast[Any]&quot;</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark.broadcast</span> <span class="kn">import</span> <span class="n">_broadcastRegistry</span>
<span class="k">if</span> <span class="n">bid</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">_broadcastRegistry</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;BROADCAST_VARIABLE_NOT_LOADED&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span>
<span class="s2">&quot;variable&quot;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">bid</span><span class="p">),</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">_broadcastRegistry</span><span class="p">[</span><span class="n">bid</span><span class="p">]</span>
<div class="viewcode-block" id="Broadcast"><a class="viewcode-back" href="../../reference/api/pyspark.Broadcast.html#pyspark.Broadcast">[docs]</a><span class="k">class</span> <span class="nc">Broadcast</span><span class="p">(</span><span class="n">Generic</span><span class="p">[</span><span class="n">T</span><span class="p">]):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A broadcast variable created with :meth:`SparkContext.broadcast`.</span>
<span class="sd"> Access its value through :attr:`value`.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])</span>
<span class="sd"> &gt;&gt;&gt; b.value</span>
<span class="sd"> [1, 2, 3, 4, 5]</span>
<span class="sd"> &gt;&gt;&gt; spark.sparkContext.parallelize([0, 0]).flatMap(lambda x: b.value).collect()</span>
<span class="sd"> [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]</span>
<span class="sd"> &gt;&gt;&gt; b.unpersist()</span>
<span class="sd"> &gt;&gt;&gt; large_broadcast = spark.sparkContext.broadcast(range(10000))</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nd">@overload</span> <span class="c1"># On driver</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;Broadcast[T]&quot;</span><span class="p">,</span>
<span class="n">sc</span><span class="p">:</span> <span class="s2">&quot;SparkContext&quot;</span><span class="p">,</span>
<span class="n">value</span><span class="p">:</span> <span class="n">T</span><span class="p">,</span>
<span class="n">pickle_registry</span><span class="p">:</span> <span class="s2">&quot;BroadcastPickleRegistry&quot;</span><span class="p">,</span>
<span class="p">):</span>
<span class="o">...</span>
<span class="nd">@overload</span> <span class="c1"># On worker without decryption server</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;Broadcast[Any]&quot;</span><span class="p">,</span> <span class="o">*</span><span class="p">,</span> <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">):</span>
<span class="o">...</span>
<span class="nd">@overload</span> <span class="c1"># On worker with decryption server</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;Broadcast[Any]&quot;</span><span class="p">,</span> <span class="o">*</span><span class="p">,</span> <span class="n">sock_file</span><span class="p">:</span> <span class="nb">str</span><span class="p">):</span>
<span class="o">...</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> <span class="c1"># type: ignore[misc]</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">sc</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="s2">&quot;SparkContext&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">value</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">T</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">pickle_registry</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="s2">&quot;BroadcastPickleRegistry&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">path</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">sock_file</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">BinaryIO</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Should not be called directly by users -- use :meth:`SparkContext.broadcast`</span>
<span class="sd"> instead.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">sc</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># we&#39;re on the driver. We want the pickled data to end up in a file (maybe encrypted)</span>
<span class="n">f</span> <span class="o">=</span> <span class="n">NamedTemporaryFile</span><span class="p">(</span><span class="n">delete</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="nb">dir</span><span class="o">=</span><span class="n">sc</span><span class="o">.</span><span class="n">_temp_dir</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_path</span> <span class="o">=</span> <span class="n">f</span><span class="o">.</span><span class="n">name</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="s2">&quot;SparkContext&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">sc</span>
<span class="k">assert</span> <span class="n">sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_python_broadcast</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonRDD</span><span class="o">.</span><span class="n">setupBroadcast</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_path</span><span class="p">)</span>
<span class="n">broadcast_out</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">ChunkedStream</span><span class="p">,</span> <span class="n">IO</span><span class="p">[</span><span class="nb">bytes</span><span class="p">]]</span>
<span class="k">if</span> <span class="n">sc</span><span class="o">.</span><span class="n">_encryption_enabled</span><span class="p">:</span>
<span class="c1"># with encryption, we ask the jvm to do the encryption for us, we send it data</span>
<span class="c1"># over a socket</span>
<span class="n">port</span><span class="p">,</span> <span class="n">auth_secret</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_python_broadcast</span><span class="o">.</span><span class="n">setupEncryptionServer</span><span class="p">()</span>
<span class="p">(</span><span class="n">encryption_sock_file</span><span class="p">,</span> <span class="n">_</span><span class="p">)</span> <span class="o">=</span> <span class="n">local_connect_and_auth</span><span class="p">(</span><span class="n">port</span><span class="p">,</span> <span class="n">auth_secret</span><span class="p">)</span>
<span class="n">broadcast_out</span> <span class="o">=</span> <span class="n">ChunkedStream</span><span class="p">(</span><span class="n">encryption_sock_file</span><span class="p">,</span> <span class="mi">8192</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># no encryption, we can just write pickled data directly to the file from python</span>
<span class="n">broadcast_out</span> <span class="o">=</span> <span class="n">f</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dump</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="n">broadcast_out</span><span class="p">)</span> <span class="c1"># type: ignore[arg-type]</span>
<span class="k">if</span> <span class="n">sc</span><span class="o">.</span><span class="n">_encryption_enabled</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_python_broadcast</span><span class="o">.</span><span class="n">waitTillDataReceived</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jbroadcast</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">broadcast</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_python_broadcast</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pickle_registry</span> <span class="o">=</span> <span class="n">pickle_registry</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># we&#39;re on an executor</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jbroadcast</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sc</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_python_broadcast</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">sock_file</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># the jvm is doing decryption for us. Read the value</span>
<span class="c1"># immediately from the sock_file</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_value</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="n">sock_file</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># the jvm just dumps the pickled data in path -- we&#39;ll unpickle lazily when</span>
<span class="c1"># the value is requested</span>
<span class="k">assert</span> <span class="n">path</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_path</span> <span class="o">=</span> <span class="n">path</span>
<div class="viewcode-block" id="Broadcast.dump"><a class="viewcode-back" href="../../reference/api/pyspark.Broadcast.dump.html#pyspark.Broadcast.dump">[docs]</a> <span class="k">def</span> <span class="nf">dump</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="n">T</span><span class="p">,</span> <span class="n">f</span><span class="p">:</span> <span class="n">BinaryIO</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Write a pickled representation of value to the open file or socket.</span>
<span class="sd"> The protocol pickle is HIGHEST_PROTOCOL.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> value : T</span>
<span class="sd"> Value to write.</span>
<span class="sd"> f : :class:`BinaryIO`</span>
<span class="sd"> File or socket where the pickled value will be stored.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; import os</span>
<span class="sd"> &gt;&gt;&gt; import tempfile</span>
<span class="sd"> &gt;&gt;&gt; b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])</span>
<span class="sd"> Write a pickled representation of `b` to the open temp file.</span>
<span class="sd"> &gt;&gt;&gt; with tempfile.TemporaryDirectory() as d:</span>
<span class="sd"> ... path = os.path.join(d, &quot;test.txt&quot;)</span>
<span class="sd"> ... with open(path, &quot;wb&quot;) as f:</span>
<span class="sd"> ... b.dump(b.value, f)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">pickle</span><span class="o">.</span><span class="n">dump</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="n">f</span><span class="p">,</span> <span class="n">pickle_protocol</span><span class="p">)</span>
<span class="k">except</span> <span class="n">pickle</span><span class="o">.</span><span class="n">PickleError</span><span class="p">:</span>
<span class="k">raise</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">msg</span> <span class="o">=</span> <span class="s2">&quot;Could not serialize broadcast: </span><span class="si">%s</span><span class="s2">: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">))</span>
<span class="n">print_exec</span><span class="p">(</span><span class="n">sys</span><span class="o">.</span><span class="n">stderr</span><span class="p">)</span>
<span class="k">raise</span> <span class="n">pickle</span><span class="o">.</span><span class="n">PicklingError</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span>
<span class="n">f</span><span class="o">.</span><span class="n">close</span><span class="p">()</span></div>
<div class="viewcode-block" id="Broadcast.load_from_path"><a class="viewcode-back" href="../../reference/api/pyspark.Broadcast.load_from_path.html#pyspark.Broadcast.load_from_path">[docs]</a> <span class="k">def</span> <span class="nf">load_from_path</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">T</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Read the pickled representation of an object from the open file and</span>
<span class="sd"> return the reconstituted object hierarchy specified therein.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> path : str</span>
<span class="sd"> File path where reads the pickled value.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> T</span>
<span class="sd"> The object hierarchy specified therein reconstituted</span>
<span class="sd"> from the pickled representation of an object.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; import os</span>
<span class="sd"> &gt;&gt;&gt; import tempfile</span>
<span class="sd"> &gt;&gt;&gt; b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])</span>
<span class="sd"> &gt;&gt;&gt; c = spark.sparkContext.broadcast(1)</span>
<span class="sd"> Read the pickled representation of value from temp file.</span>
<span class="sd"> &gt;&gt;&gt; with tempfile.TemporaryDirectory() as d:</span>
<span class="sd"> ... path = os.path.join(d, &quot;test.txt&quot;)</span>
<span class="sd"> ... with open(path, &quot;wb&quot;) as f:</span>
<span class="sd"> ... b.dump(b.value, f)</span>
<span class="sd"> ... c.load_from_path(path)</span>
<span class="sd"> [1, 2, 3, 4, 5]</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="s2">&quot;rb&quot;</span><span class="p">,</span> <span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">20</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="n">f</span><span class="p">)</span></div>
<div class="viewcode-block" id="Broadcast.load"><a class="viewcode-back" href="../../reference/api/pyspark.Broadcast.load.html#pyspark.Broadcast.load">[docs]</a> <span class="k">def</span> <span class="nf">load</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">file</span><span class="p">:</span> <span class="n">BinaryIO</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">T</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Read a pickled representation of value from the open file or socket.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> file : :class:`BinaryIO`</span>
<span class="sd"> File or socket where the pickled value will be read.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> T</span>
<span class="sd"> The object hierarchy specified therein reconstituted</span>
<span class="sd"> from the pickled representation of an object.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; import os</span>
<span class="sd"> &gt;&gt;&gt; import tempfile</span>
<span class="sd"> &gt;&gt;&gt; b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])</span>
<span class="sd"> &gt;&gt;&gt; c = spark.sparkContext.broadcast(1)</span>
<span class="sd"> Read the pickled representation of value from the open temp file.</span>
<span class="sd"> &gt;&gt;&gt; with tempfile.TemporaryDirectory() as d:</span>
<span class="sd"> ... path = os.path.join(d, &quot;test.txt&quot;)</span>
<span class="sd"> ... with open(path, &quot;wb&quot;) as f:</span>
<span class="sd"> ... b.dump(b.value, f)</span>
<span class="sd"> ... with open(path, &quot;rb&quot;) as f:</span>
<span class="sd"> ... c.load(f)</span>
<span class="sd"> [1, 2, 3, 4, 5]</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">gc</span><span class="o">.</span><span class="n">disable</span><span class="p">()</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">return</span> <span class="n">pickle</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="n">file</span><span class="p">)</span>
<span class="k">finally</span><span class="p">:</span>
<span class="n">gc</span><span class="o">.</span><span class="n">enable</span><span class="p">()</span></div>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">value</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">T</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Return the broadcasted value&quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s2">&quot;_value&quot;</span><span class="p">)</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">_path</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># we only need to decrypt it here when encryption is enabled and</span>
<span class="c1"># if its on the driver, since executor decryption is handled already</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_encryption_enabled</span><span class="p">:</span>
<span class="n">port</span><span class="p">,</span> <span class="n">auth_secret</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_python_broadcast</span><span class="o">.</span><span class="n">setupDecryptionServer</span><span class="p">()</span>
<span class="p">(</span><span class="n">decrypted_sock_file</span><span class="p">,</span> <span class="n">_</span><span class="p">)</span> <span class="o">=</span> <span class="n">local_connect_and_auth</span><span class="p">(</span><span class="n">port</span><span class="p">,</span> <span class="n">auth_secret</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_python_broadcast</span><span class="o">.</span><span class="n">waitTillBroadcastDataSent</span><span class="p">()</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="n">decrypted_sock_file</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_value</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">load_from_path</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_path</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_value</span>
<div class="viewcode-block" id="Broadcast.unpersist"><a class="viewcode-back" href="../../reference/api/pyspark.Broadcast.unpersist.html#pyspark.Broadcast.unpersist">[docs]</a> <span class="k">def</span> <span class="nf">unpersist</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">blocking</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Delete cached copies of this broadcast on the executors. If the</span>
<span class="sd"> broadcast is used after this is called, it will need to be</span>
<span class="sd"> re-sent to each executor.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> blocking : bool, optional, default False</span>
<span class="sd"> Whether to block until unpersisting has completed.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])</span>
<span class="sd"> Delete cached copies of this broadcast on the executors</span>
<span class="sd"> &gt;&gt;&gt; b.unpersist()</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jbroadcast</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_BROADCAST_OPERATION&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;operation&quot;</span><span class="p">:</span> <span class="s2">&quot;unpersisted&quot;</span><span class="p">},</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jbroadcast</span><span class="o">.</span><span class="n">unpersist</span><span class="p">(</span><span class="n">blocking</span><span class="p">)</span></div>
<div class="viewcode-block" id="Broadcast.destroy"><a class="viewcode-back" href="../../reference/api/pyspark.Broadcast.destroy.html#pyspark.Broadcast.destroy">[docs]</a> <span class="k">def</span> <span class="nf">destroy</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">blocking</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Destroy all data and metadata related to this broadcast variable.</span>
<span class="sd"> Use this with caution; once a broadcast variable has been destroyed,</span>
<span class="sd"> it cannot be used again.</span>
<span class="sd"> .. versionchanged:: 3.0.0</span>
<span class="sd"> Added optional argument `blocking` to specify whether to block until all</span>
<span class="sd"> blocks are deleted.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> blocking : bool, optional, default False</span>
<span class="sd"> Whether to block until unpersisting has completed.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])</span>
<span class="sd"> Destroy all data and metadata related to this broadcast variable</span>
<span class="sd"> &gt;&gt;&gt; b.destroy()</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jbroadcast</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_BROADCAST_OPERATION&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;operation&quot;</span><span class="p">:</span> <span class="s2">&quot;destroyed&quot;</span><span class="p">},</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jbroadcast</span><span class="o">.</span><span class="n">destroy</span><span class="p">(</span><span class="n">blocking</span><span class="p">)</span>
<span class="n">os</span><span class="o">.</span><span class="n">unlink</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_path</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">__reduce__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Tuple</span><span class="p">[</span><span class="n">Callable</span><span class="p">[[</span><span class="nb">int</span><span class="p">],</span> <span class="s2">&quot;Broadcast[T]&quot;</span><span class="p">],</span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">int</span><span class="p">]]:</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jbroadcast</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span>
<span class="n">error_class</span><span class="o">=</span><span class="s2">&quot;INVALID_BROADCAST_OPERATION&quot;</span><span class="p">,</span>
<span class="n">message_parameters</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;operation&quot;</span><span class="p">:</span> <span class="s2">&quot;serialized&quot;</span><span class="p">},</span>
<span class="p">)</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pickle_registry</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pickle_registry</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_from_id</span><span class="p">,</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jbroadcast</span><span class="o">.</span><span class="n">id</span><span class="p">(),)</span></div>
<span class="k">class</span> <span class="nc">BroadcastPickleRegistry</span><span class="p">(</span><span class="n">threading</span><span class="o">.</span><span class="n">local</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Thread-local registry for broadcast variables that have been pickled&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">setdefault</span><span class="p">(</span><span class="s2">&quot;_registry&quot;</span><span class="p">,</span> <span class="nb">set</span><span class="p">())</span>
<span class="k">def</span> <span class="fm">__iter__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">Broadcast</span><span class="p">[</span><span class="n">Any</span><span class="p">]]:</span>
<span class="k">for</span> <span class="n">bcast</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_registry</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">bcast</span>
<span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">bcast</span><span class="p">:</span> <span class="n">Broadcast</span><span class="p">[</span><span class="n">Any</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_registry</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">bcast</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">clear</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_registry</span><span class="o">.</span><span class="n">clear</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">_test</span><span class="p">()</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">doctest</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span>
<span class="kn">import</span> <span class="nn">pyspark.broadcast</span>
<span class="n">globs</span> <span class="o">=</span> <span class="n">pyspark</span><span class="o">.</span><span class="n">broadcast</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span><span class="o">.</span><span class="n">builder</span><span class="o">.</span><span class="n">master</span><span class="p">(</span><span class="s2">&quot;local[4]&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">appName</span><span class="p">(</span><span class="s2">&quot;broadcast tests&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;spark&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">spark</span>
<span class="p">(</span><span class="n">failure_count</span><span class="p">,</span> <span class="n">test_count</span><span class="p">)</span> <span class="o">=</span> <span class="n">doctest</span><span class="o">.</span><span class="n">testmod</span><span class="p">(</span><span class="n">pyspark</span><span class="o">.</span><span class="n">broadcast</span><span class="p">,</span> <span class="n">globs</span><span class="o">=</span><span class="n">globs</span><span class="p">)</span>
<span class="n">spark</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">if</span> <span class="n">failure_count</span><span class="p">:</span>
<span class="n">sys</span><span class="o">.</span><span class="n">exit</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">)</span>
<span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">&quot;__main__&quot;</span><span class="p">:</span>
<span class="n">_test</span><span class="p">()</span>
</pre></div>
</div>
<!-- Previous / next buttons -->
<div class='prev-next-area'>
</div>
</main>
</div>
</div>
<script src="../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf"></script>
<footer class="footer mt-5 mt-md-0">
<div class="container">
<div class="footer-item">
<p class="copyright">
&copy; Copyright .<br>
</p>
</div>
<div class="footer-item">
<p class="sphinx-version">
Created using <a href="http://sphinx-doc.org/">Sphinx</a> 3.0.4.<br>
</p>
</div>
</div>
</footer>
</body>
</html>