| |
| <!DOCTYPE html> |
| |
| <html> |
| <head> |
| <meta charset="utf-8" /> |
| <title>pyspark.context — PySpark master documentation</title> |
| |
| <link href="../../_static/styles/theme.css?digest=1999514e3f237ded88cf" rel="stylesheet"> |
| <link href="../../_static/styles/pydata-sphinx-theme.css?digest=1999514e3f237ded88cf" rel="stylesheet"> |
| |
| |
| <link rel="stylesheet" |
| href="../../_static/vendor/fontawesome/5.13.0/css/all.min.css"> |
| <link rel="preload" as="font" type="font/woff2" crossorigin |
| href="../../_static/vendor/fontawesome/5.13.0/webfonts/fa-solid-900.woff2"> |
| <link rel="preload" as="font" type="font/woff2" crossorigin |
| href="../../_static/vendor/fontawesome/5.13.0/webfonts/fa-brands-400.woff2"> |
| |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../_static/styles/pydata-sphinx-theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../_static/pygments.css" type="text/css" /> |
| <link rel="stylesheet" type="text/css" href="../../_static/copybutton.css" /> |
| <link rel="stylesheet" type="text/css" href="../../_static/css/pyspark.css" /> |
| |
| <link rel="preload" as="script" href="../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf"> |
| |
| <script id="documentation_options" data-url_root="../../" src="../../_static/documentation_options.js"></script> |
| <script src="../../_static/jquery.js"></script> |
| <script src="../../_static/underscore.js"></script> |
| <script src="../../_static/doctools.js"></script> |
| <script src="../../_static/language_data.js"></script> |
| <script src="../../_static/clipboard.min.js"></script> |
| <script src="../../_static/copybutton.js"></script> |
| <script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script> |
| <script async="async" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| <script type="text/x-mathjax-config">MathJax.Hub.Config({"tex2jax": {"inlineMath": [["$", "$"], ["\\(", "\\)"]], "processEscapes": true, "ignoreClass": "tex2jax_ignore|mathjax_ignore|document", "processClass": "tex2jax_process|mathjax_process|math|output_area"}})</script> |
| <link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/_modules/pyspark/context.html" /> |
| <link rel="search" title="Search" href="../../search.html" /> |
| <meta name="viewport" content="width=device-width, initial-scale=1" /> |
| <meta name="docsearch:language" content="None"> |
| |
| |
| <!-- Google Analytics --> |
| |
| </head> |
| <body data-spy="scroll" data-target="#bd-toc-nav" data-offset="80"> |
| |
| <div class="container-fluid" id="banner"></div> |
| |
| |
| <nav class="navbar navbar-light navbar-expand-lg bg-light fixed-top bd-navbar" id="navbar-main"><div class="container-xl"> |
| |
| <div id="navbar-start"> |
| |
| |
| |
| <a class="navbar-brand" href="../../index.html"> |
| <img src="../../_static/spark-logo-reverse.png" class="logo" alt="logo"> |
| </a> |
| |
| |
| |
| </div> |
| |
| <button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbar-collapsible" aria-controls="navbar-collapsible" aria-expanded="false" aria-label="Toggle navigation"> |
| <span class="navbar-toggler-icon"></span> |
| </button> |
| |
| |
| <div id="navbar-collapsible" class="col-lg-9 collapse navbar-collapse"> |
| <div id="navbar-center" class="mr-auto"> |
| |
| <div class="navbar-center-item"> |
| <ul id="navbar-main-elements" class="navbar-nav"> |
| <li class="toctree-l1 nav-item"> |
| <a class="reference internal nav-link" href="../../index.html"> |
| Overview |
| </a> |
| </li> |
| |
| <li class="toctree-l1 nav-item"> |
| <a class="reference internal nav-link" href="../../getting_started/index.html"> |
| Getting Started |
| </a> |
| </li> |
| |
| <li class="toctree-l1 nav-item"> |
| <a class="reference internal nav-link" href="../../user_guide/index.html"> |
| User Guides |
| </a> |
| </li> |
| |
| <li class="toctree-l1 nav-item"> |
| <a class="reference internal nav-link" href="../../reference/index.html"> |
| API Reference |
| </a> |
| </li> |
| |
| <li class="toctree-l1 nav-item"> |
| <a class="reference internal nav-link" href="../../development/index.html"> |
| Development |
| </a> |
| </li> |
| |
| <li class="toctree-l1 nav-item"> |
| <a class="reference internal nav-link" href="../../migration_guide/index.html"> |
| Migration Guides |
| </a> |
| </li> |
| |
| |
| </ul> |
| </div> |
| |
| </div> |
| |
| <div id="navbar-end"> |
| |
| <div class="navbar-end-item"> |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <div id="version-button" class="dropdown"> |
| <button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown"> |
| master |
| <span class="caret"></span> |
| </button> |
| <div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button"> |
| <!-- dropdown will be populated by javascript on page load --> |
| </div> |
| </div> |
| |
| <script type="text/javascript"> |
| // Function to construct the target URL from the JSON components |
| function buildURL(entry) { |
| var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja |
| template = template.replace("{version}", entry.version); |
| return template; |
| } |
| |
| // Function to check if corresponding page path exists in other version of docs |
| // and, if so, go there instead of the homepage of the other docs version |
| function checkPageExistsAndRedirect(event) { |
| const currentFilePath = "_modules/pyspark/context.html", |
| otherDocsHomepage = event.target.getAttribute("href"); |
| let tryUrl = `${otherDocsHomepage}${currentFilePath}`; |
| $.ajax({ |
| type: 'HEAD', |
| url: tryUrl, |
| // if the page exists, go there |
| success: function() { |
| location.href = tryUrl; |
| } |
| }).fail(function() { |
| location.href = otherDocsHomepage; |
| }); |
| return false; |
| } |
| |
| // Function to populate the version switcher |
| (function () { |
| // get JSON config |
| $.getJSON("_static/versions.json", function(data, textStatus, jqXHR) { |
| // create the nodes first (before AJAX calls) to ensure the order is |
| // correct (for now, links will go to doc version homepage) |
| $.each(data, function(index, entry) { |
| // if no custom name specified (e.g., "latest"), use version string |
| if (!("name" in entry)) { |
| entry.name = entry.version; |
| } |
| // construct the appropriate URL, and add it to the dropdown |
| entry.url = buildURL(entry); |
| const node = document.createElement("a"); |
| node.setAttribute("class", "list-group-item list-group-item-action py-1"); |
| node.setAttribute("href", `${entry.url}`); |
| node.textContent = `${entry.name}`; |
| node.onclick = checkPageExistsAndRedirect; |
| $("#version_switcher").append(node); |
| }); |
| }); |
| })(); |
| </script> |
| </div> |
| |
| </div> |
| </div> |
| </div> |
| </nav> |
| |
| |
| <div class="container-xl"> |
| <div class="row"> |
| |
| |
| <!-- Only show if we have sidebars configured, else just a small margin --> |
| <div class="col-12 col-md-3 bd-sidebar"> |
| <div class="sidebar-start-items"><form class="bd-search d-flex align-items-center" action="../../search.html" method="get"> |
| <i class="icon fas fa-search"></i> |
| <input type="search" class="form-control" name="q" id="search-input" placeholder="Search the docs ..." aria-label="Search the docs ..." autocomplete="off" > |
| </form><nav class="bd-links" id="bd-docs-nav" aria-label="Main navigation"> |
| <div class="bd-toc-item active"> |
| |
| </div> |
| </nav> |
| </div> |
| <div class="sidebar-end-items"> |
| </div> |
| </div> |
| |
| |
| |
| |
| <div class="d-none d-xl-block col-xl-2 bd-toc"> |
| |
| </div> |
| |
| |
| |
| |
| |
| |
| <main class="col-12 col-md-9 col-xl-7 py-md-5 pl-md-5 pr-md-4 bd-content" role="main"> |
| |
| <div> |
| |
| <h1>Source code for pyspark.context</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="kn">import</span> <span class="nn">os</span> |
| <span class="kn">import</span> <span class="nn">shutil</span> |
| <span class="kn">import</span> <span class="nn">signal</span> |
| <span class="kn">import</span> <span class="nn">sys</span> |
| <span class="kn">import</span> <span class="nn">threading</span> |
| <span class="kn">import</span> <span class="nn">warnings</span> |
| <span class="kn">import</span> <span class="nn">importlib</span> |
| <span class="kn">from</span> <span class="nn">threading</span> <span class="kn">import</span> <span class="n">RLock</span> |
| <span class="kn">from</span> <span class="nn">tempfile</span> <span class="kn">import</span> <span class="n">NamedTemporaryFile</span> |
| <span class="kn">from</span> <span class="nn">types</span> <span class="kn">import</span> <span class="n">TracebackType</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="p">(</span> |
| <span class="n">Any</span><span class="p">,</span> |
| <span class="n">Callable</span><span class="p">,</span> |
| <span class="n">cast</span><span class="p">,</span> |
| <span class="n">ClassVar</span><span class="p">,</span> |
| <span class="n">Dict</span><span class="p">,</span> |
| <span class="n">Iterable</span><span class="p">,</span> |
| <span class="n">List</span><span class="p">,</span> |
| <span class="n">NoReturn</span><span class="p">,</span> |
| <span class="n">Optional</span><span class="p">,</span> |
| <span class="n">Sequence</span><span class="p">,</span> |
| <span class="n">Tuple</span><span class="p">,</span> |
| <span class="n">Type</span><span class="p">,</span> |
| <span class="n">TYPE_CHECKING</span><span class="p">,</span> |
| <span class="n">TypeVar</span><span class="p">,</span> |
| <span class="n">Set</span><span class="p">,</span> |
| <span class="p">)</span> |
| |
| <span class="kn">from</span> <span class="nn">py4j.java_collections</span> <span class="kn">import</span> <span class="n">JavaMap</span> |
| <span class="kn">from</span> <span class="nn">py4j.protocol</span> <span class="kn">import</span> <span class="n">Py4JError</span> |
| |
| <span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">accumulators</span> |
| <span class="kn">from</span> <span class="nn">pyspark.accumulators</span> <span class="kn">import</span> <span class="n">Accumulator</span> |
| <span class="kn">from</span> <span class="nn">pyspark.broadcast</span> <span class="kn">import</span> <span class="n">Broadcast</span><span class="p">,</span> <span class="n">BroadcastPickleRegistry</span> |
| <span class="kn">from</span> <span class="nn">pyspark.conf</span> <span class="kn">import</span> <span class="n">SparkConf</span> |
| <span class="kn">from</span> <span class="nn">pyspark.files</span> <span class="kn">import</span> <span class="n">SparkFiles</span> |
| <span class="kn">from</span> <span class="nn">pyspark.java_gateway</span> <span class="kn">import</span> <span class="n">launch_gateway</span><span class="p">,</span> <span class="n">local_connect_and_auth</span> |
| <span class="kn">from</span> <span class="nn">pyspark.serializers</span> <span class="kn">import</span> <span class="p">(</span> |
| <span class="n">CPickleSerializer</span><span class="p">,</span> |
| <span class="n">BatchedSerializer</span><span class="p">,</span> |
| <span class="n">Serializer</span><span class="p">,</span> |
| <span class="n">UTF8Deserializer</span><span class="p">,</span> |
| <span class="n">PairDeserializer</span><span class="p">,</span> |
| <span class="n">AutoBatchedSerializer</span><span class="p">,</span> |
| <span class="n">NoOpSerializer</span><span class="p">,</span> |
| <span class="n">ChunkedStream</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="kn">from</span> <span class="nn">pyspark.storagelevel</span> <span class="kn">import</span> <span class="n">StorageLevel</span> |
| <span class="kn">from</span> <span class="nn">pyspark.resource.information</span> <span class="kn">import</span> <span class="n">ResourceInformation</span> |
| <span class="kn">from</span> <span class="nn">pyspark.rdd</span> <span class="kn">import</span> <span class="n">RDD</span><span class="p">,</span> <span class="n">_load_from_socket</span> |
| <span class="kn">from</span> <span class="nn">pyspark.taskcontext</span> <span class="kn">import</span> <span class="n">TaskContext</span> |
| <span class="kn">from</span> <span class="nn">pyspark.traceback_utils</span> <span class="kn">import</span> <span class="n">CallSite</span><span class="p">,</span> <span class="n">first_spark_call</span> |
| <span class="kn">from</span> <span class="nn">pyspark.status</span> <span class="kn">import</span> <span class="n">StatusTracker</span> |
| <span class="kn">from</span> <span class="nn">pyspark.profiler</span> <span class="kn">import</span> <span class="n">ProfilerCollector</span><span class="p">,</span> <span class="n">BasicProfiler</span><span class="p">,</span> <span class="n">UDFBasicProfiler</span><span class="p">,</span> <span class="n">MemoryProfiler</span> |
| <span class="kn">from</span> <span class="nn">pyspark.errors</span> <span class="kn">import</span> <span class="n">PySparkRuntimeError</span> |
| <span class="kn">from</span> <span class="nn">py4j.java_gateway</span> <span class="kn">import</span> <span class="n">is_instance_of</span><span class="p">,</span> <span class="n">JavaGateway</span><span class="p">,</span> <span class="n">JavaObject</span><span class="p">,</span> <span class="n">JVMView</span> |
| |
| <span class="k">if</span> <span class="n">TYPE_CHECKING</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">pyspark.accumulators</span> <span class="kn">import</span> <span class="n">AccumulatorParam</span> |
| |
| <span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s2">"SparkContext"</span><span class="p">]</span> |
| |
| |
| <span class="c1"># These are special default configs for PySpark, they will overwrite</span> |
| <span class="c1"># the default ones for Spark if they are not configured by user.</span> |
| <span class="n">DEFAULT_CONFIGS</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="s2">"spark.serializer.objectStreamReset"</span><span class="p">:</span> <span class="mi">100</span><span class="p">,</span> |
| <span class="s2">"spark.rdd.compress"</span><span class="p">:</span> <span class="kc">True</span><span class="p">,</span> |
| <span class="p">}</span> |
| |
| <span class="n">T</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s2">"T"</span><span class="p">)</span> |
| <span class="n">U</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s2">"U"</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="SparkContext"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.html#pyspark.SparkContext">[docs]</a><span class="k">class</span> <span class="nc">SparkContext</span><span class="p">:</span> |
| |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Main entry point for Spark functionality. A SparkContext represents the</span> |
| <span class="sd"> connection to a Spark cluster, and can be used to create :class:`RDD` and</span> |
| <span class="sd"> broadcast variables on that cluster.</span> |
| |
| <span class="sd"> When you create a new SparkContext, at least the master and app name should</span> |
| <span class="sd"> be set, either through the named parameters here or through `conf`.</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> master : str, optional</span> |
| <span class="sd"> Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).</span> |
| <span class="sd"> appName : str, optional</span> |
| <span class="sd"> A name for your job, to display on the cluster web UI.</span> |
| <span class="sd"> sparkHome : str, optional</span> |
| <span class="sd"> Location where Spark is installed on cluster nodes.</span> |
| <span class="sd"> pyFiles : list, optional</span> |
| <span class="sd"> Collection of .zip or .py files to send to the cluster</span> |
| <span class="sd"> and add to PYTHONPATH. These can be paths on the local file</span> |
| <span class="sd"> system or HDFS, HTTP, HTTPS, or FTP URLs.</span> |
| <span class="sd"> environment : dict, optional</span> |
| <span class="sd"> A dictionary of environment variables to set on</span> |
| <span class="sd"> worker nodes.</span> |
| <span class="sd"> batchSize : int, optional, default 0</span> |
| <span class="sd"> The number of Python objects represented as a single</span> |
| <span class="sd"> Java object. Set 1 to disable batching, 0 to automatically choose</span> |
| <span class="sd"> the batch size based on object sizes, or -1 to use an unlimited</span> |
| <span class="sd"> batch size</span> |
| <span class="sd"> serializer : :class:`Serializer`, optional, default :class:`CPickleSerializer`</span> |
| <span class="sd"> The serializer for RDDs.</span> |
| <span class="sd"> conf : :class:`SparkConf`, optional</span> |
| <span class="sd"> An object setting Spark properties.</span> |
| <span class="sd"> gateway : class:`py4j.java_gateway.JavaGateway`, optional</span> |
| <span class="sd"> Use an existing gateway and JVM, otherwise a new JVM</span> |
| <span class="sd"> will be instantiated. This is only used internally.</span> |
| <span class="sd"> jsc : class:`py4j.java_gateway.JavaObject`, optional</span> |
| <span class="sd"> The JavaSparkContext instance. This is only used internally.</span> |
| <span class="sd"> profiler_cls : type, optional, default :class:`BasicProfiler`</span> |
| <span class="sd"> A class of custom Profiler used to do profiling</span> |
| <span class="sd"> udf_profiler_cls : type, optional, default :class:`UDFBasicProfiler`</span> |
| <span class="sd"> A class of custom Profiler used to do udf profiling</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> Only one :class:`SparkContext` should be active per JVM. You must `stop()`</span> |
| <span class="sd"> the active :class:`SparkContext` before creating a new one.</span> |
| |
| <span class="sd"> :class:`SparkContext` instance is not supported to share across multiple</span> |
| <span class="sd"> processes out of the box, and PySpark does not guarantee multi-processing execution.</span> |
| <span class="sd"> Use threads instead for concurrent processing purpose.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> from pyspark.context import SparkContext</span> |
| <span class="sd"> >>> sc = SparkContext('local', 'test')</span> |
| <span class="sd"> >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL</span> |
| <span class="sd"> Traceback (most recent call last):</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ValueError: ...</span> |
| <span class="sd"> """</span> |
| |
| <span class="n">_gateway</span><span class="p">:</span> <span class="n">ClassVar</span><span class="p">[</span><span class="n">Optional</span><span class="p">[</span><span class="n">JavaGateway</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="n">_jvm</span><span class="p">:</span> <span class="n">ClassVar</span><span class="p">[</span><span class="n">Optional</span><span class="p">[</span><span class="n">JVMView</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="n">_next_accum_id</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="n">_active_spark_context</span><span class="p">:</span> <span class="n">ClassVar</span><span class="p">[</span><span class="n">Optional</span><span class="p">[</span><span class="s2">"SparkContext"</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="n">_lock</span> <span class="o">=</span> <span class="n">RLock</span><span class="p">()</span> |
| <span class="n">_python_includes</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span> |
| <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> |
| <span class="p">]</span> <span class="o">=</span> <span class="kc">None</span> <span class="c1"># zip and egg files that need to be added to PYTHONPATH</span> |
| <span class="n">serializer</span><span class="p">:</span> <span class="n">Serializer</span> |
| <span class="n">profiler_collector</span><span class="p">:</span> <span class="n">ProfilerCollector</span> |
| |
| <span class="n">PACKAGE_EXTENSIONS</span><span class="p">:</span> <span class="n">Iterable</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span><span class="s2">".zip"</span><span class="p">,</span> <span class="s2">".egg"</span><span class="p">,</span> <span class="s2">".jar"</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">master</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">appName</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">sparkHome</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">pyFiles</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">environment</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">batchSize</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">0</span><span class="p">,</span> |
| <span class="n">serializer</span><span class="p">:</span> <span class="s2">"Serializer"</span> <span class="o">=</span> <span class="n">CPickleSerializer</span><span class="p">(),</span> |
| <span class="n">conf</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">SparkConf</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">gateway</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">JavaGateway</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">jsc</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">JavaObject</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">profiler_cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="n">BasicProfiler</span><span class="p">]</span> <span class="o">=</span> <span class="n">BasicProfiler</span><span class="p">,</span> |
| <span class="n">udf_profiler_cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="n">UDFBasicProfiler</span><span class="p">]</span> <span class="o">=</span> <span class="n">UDFBasicProfiler</span><span class="p">,</span> |
| <span class="n">memory_profiler_cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="n">MemoryProfiler</span><span class="p">]</span> <span class="o">=</span> <span class="n">MemoryProfiler</span><span class="p">,</span> |
| <span class="p">):</span> |
| <span class="k">if</span> <span class="s2">"SPARK_CONNECT_MODE_ENABLED"</span> <span class="ow">in</span> <span class="n">os</span><span class="o">.</span><span class="n">environ</span> <span class="ow">and</span> <span class="s2">"SPARK_LOCAL_REMOTE"</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">os</span><span class="o">.</span><span class="n">environ</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span> |
| <span class="n">error_class</span><span class="o">=</span><span class="s2">"CONTEXT_UNAVAILABLE_FOR_REMOTE_CLIENT"</span><span class="p">,</span> |
| <span class="n">message_parameters</span><span class="o">=</span><span class="p">{},</span> |
| <span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">conf</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">or</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"spark.executor.allowSparkContext"</span><span class="p">,</span> <span class="s2">"false"</span><span class="p">)</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span> <span class="o">!=</span> <span class="s2">"true"</span><span class="p">:</span> |
| <span class="c1"># In order to prevent SparkContext from being created in executors.</span> |
| <span class="n">SparkContext</span><span class="o">.</span><span class="n">_assert_on_driver</span><span class="p">()</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_callsite</span> <span class="o">=</span> <span class="n">first_spark_call</span><span class="p">()</span> <span class="ow">or</span> <span class="n">CallSite</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">gateway</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">gateway</span><span class="o">.</span><span class="n">gateway_parameters</span><span class="o">.</span><span class="n">auth_token</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"You are trying to pass an insecure Py4j gateway to Spark. This"</span> |
| <span class="s2">" is not allowed as it is a security risk."</span> |
| <span class="p">)</span> |
| |
| <span class="n">SparkContext</span><span class="o">.</span><span class="n">_ensure_initialized</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">gateway</span><span class="o">=</span><span class="n">gateway</span><span class="p">,</span> <span class="n">conf</span><span class="o">=</span><span class="n">conf</span><span class="p">)</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_do_init</span><span class="p">(</span> |
| <span class="n">master</span><span class="p">,</span> |
| <span class="n">appName</span><span class="p">,</span> |
| <span class="n">sparkHome</span><span class="p">,</span> |
| <span class="n">pyFiles</span><span class="p">,</span> |
| <span class="n">environment</span><span class="p">,</span> |
| <span class="n">batchSize</span><span class="p">,</span> |
| <span class="n">serializer</span><span class="p">,</span> |
| <span class="n">conf</span><span class="p">,</span> |
| <span class="n">jsc</span><span class="p">,</span> |
| <span class="n">profiler_cls</span><span class="p">,</span> |
| <span class="n">udf_profiler_cls</span><span class="p">,</span> |
| <span class="n">memory_profiler_cls</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="k">except</span> <span class="ne">BaseException</span><span class="p">:</span> |
| <span class="c1"># If an error occurs, clean up in order to allow future SparkContext creation:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span> |
| <span class="k">raise</span> |
| |
| <span class="k">def</span> <span class="nf">_do_init</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">master</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">],</span> |
| <span class="n">appName</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">],</span> |
| <span class="n">sparkHome</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">],</span> |
| <span class="n">pyFiles</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]],</span> |
| <span class="n">environment</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]],</span> |
| <span class="n">batchSize</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> |
| <span class="n">serializer</span><span class="p">:</span> <span class="n">Serializer</span><span class="p">,</span> |
| <span class="n">conf</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">SparkConf</span><span class="p">],</span> |
| <span class="n">jsc</span><span class="p">:</span> <span class="n">JavaObject</span><span class="p">,</span> |
| <span class="n">profiler_cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="n">BasicProfiler</span><span class="p">]</span> <span class="o">=</span> <span class="n">BasicProfiler</span><span class="p">,</span> |
| <span class="n">udf_profiler_cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="n">UDFBasicProfiler</span><span class="p">]</span> <span class="o">=</span> <span class="n">UDFBasicProfiler</span><span class="p">,</span> |
| <span class="n">memory_profiler_cls</span><span class="p">:</span> <span class="n">Type</span><span class="p">[</span><span class="n">MemoryProfiler</span><span class="p">]</span> <span class="o">=</span> <span class="n">MemoryProfiler</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">environment</span> <span class="o">=</span> <span class="n">environment</span> <span class="ow">or</span> <span class="p">{}</span> |
| <span class="c1"># java gateway must have been launched at this point.</span> |
| <span class="k">if</span> <span class="n">conf</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">conf</span><span class="o">.</span><span class="n">_jconf</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="c1"># conf has been initialized in JVM properly, so use conf directly. This represents the</span> |
| <span class="c1"># scenario that JVM has been launched before SparkConf is created (e.g. SparkContext is</span> |
| <span class="c1"># created and then stopped, and we create a new SparkConf and new SparkContext again)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span> <span class="o">=</span> <span class="n">conf</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span> <span class="o">=</span> <span class="n">SparkConf</span><span class="p">(</span><span class="n">_jvm</span><span class="o">=</span><span class="n">SparkContext</span><span class="o">.</span><span class="n">_jvm</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">conf</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">conf</span><span class="o">.</span><span class="n">getAll</span><span class="p">():</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">set</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_batchSize</span> <span class="o">=</span> <span class="n">batchSize</span> <span class="c1"># -1 represents an unlimited batch size</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_unbatched_serializer</span> <span class="o">=</span> <span class="n">serializer</span> |
| <span class="k">if</span> <span class="n">batchSize</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">serializer</span> <span class="o">=</span> <span class="n">AutoBatchedSerializer</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_unbatched_serializer</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">serializer</span> <span class="o">=</span> <span class="n">BatchedSerializer</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_unbatched_serializer</span><span class="p">,</span> <span class="n">batchSize</span><span class="p">)</span> |
| |
| <span class="c1"># Set any parameters passed directly to us on the conf</span> |
| <span class="k">if</span> <span class="n">master</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">setMaster</span><span class="p">(</span><span class="n">master</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">appName</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">setAppName</span><span class="p">(</span><span class="n">appName</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">sparkHome</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">setSparkHome</span><span class="p">(</span><span class="n">sparkHome</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">environment</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">key</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">environment</span><span class="o">.</span><span class="n">items</span><span class="p">():</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">setExecutorEnv</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">key</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">DEFAULT_CONFIGS</span><span class="o">.</span><span class="n">items</span><span class="p">():</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">setIfMissing</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span> |
| |
| <span class="c1"># Check that we have at least the required parameters</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">contains</span><span class="p">(</span><span class="s2">"spark.master"</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span> |
| <span class="n">error_class</span><span class="o">=</span><span class="s2">"MASTER_URL_NOT_SET"</span><span class="p">,</span> |
| <span class="n">message_parameters</span><span class="o">=</span><span class="p">{},</span> |
| <span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">contains</span><span class="p">(</span><span class="s2">"spark.app.name"</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span> |
| <span class="n">error_class</span><span class="o">=</span><span class="s2">"APPLICATION_NAME_NOT_SET"</span><span class="p">,</span> |
| <span class="n">message_parameters</span><span class="o">=</span><span class="p">{},</span> |
| <span class="p">)</span> |
| |
| <span class="c1"># Read back our properties from the conf in case we loaded some of them from</span> |
| <span class="c1"># the classpath or an external config file</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">master</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"spark.master"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">appName</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"spark.app.name"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sparkHome</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"spark.home"</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> |
| |
| <span class="k">for</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">)</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">getAll</span><span class="p">():</span> |
| <span class="k">if</span> <span class="n">k</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s2">"spark.executorEnv."</span><span class="p">):</span> |
| <span class="n">varName</span> <span class="o">=</span> <span class="n">k</span><span class="p">[</span><span class="nb">len</span><span class="p">(</span><span class="s2">"spark.executorEnv."</span><span class="p">)</span> <span class="p">:]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">environment</span><span class="p">[</span><span class="n">varName</span><span class="p">]</span> <span class="o">=</span> <span class="n">v</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">environment</span><span class="p">[</span><span class="s2">"PYTHONHASHSEED"</span><span class="p">]</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">environ</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"PYTHONHASHSEED"</span><span class="p">,</span> <span class="s2">"0"</span><span class="p">)</span> |
| |
| <span class="c1"># Create the Java SparkContext through Py4J</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span> <span class="o">=</span> <span class="n">jsc</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">_initialize_context</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">_jconf</span><span class="p">)</span> |
| <span class="c1"># Reset the SparkConf to the one actually used by the SparkContext in JVM.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span> <span class="o">=</span> <span class="n">SparkConf</span><span class="p">(</span><span class="n">_jconf</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">conf</span><span class="p">())</span> |
| |
| <span class="c1"># Create a single Accumulator in Java that we'll send all our updates through;</span> |
| <span class="c1"># they will be passed back to us through a TCP server</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_gateway</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">auth_token</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_gateway</span><span class="o">.</span><span class="n">gateway_parameters</span><span class="o">.</span><span class="n">auth_token</span> |
| <span class="n">start_update_server</span> <span class="o">=</span> <span class="n">accumulators</span><span class="o">.</span><span class="n">_start_update_server</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_accumulatorServer</span> <span class="o">=</span> <span class="n">start_update_server</span><span class="p">(</span><span class="n">auth_token</span><span class="p">)</span> |
| <span class="p">(</span><span class="n">host</span><span class="p">,</span> <span class="n">port</span><span class="p">)</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_accumulatorServer</span><span class="o">.</span><span class="n">server_address</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_javaAccumulator</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonAccumulatorV2</span><span class="p">(</span><span class="n">host</span><span class="p">,</span> <span class="n">port</span><span class="p">,</span> <span class="n">auth_token</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">register</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_javaAccumulator</span><span class="p">)</span> |
| |
| <span class="c1"># If encryption is enabled, we need to setup a server in the jvm to read broadcast</span> |
| <span class="c1"># data via a socket.</span> |
| <span class="c1"># scala's mangled names w/ $ in them require special treatment.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_encryption_enabled</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonUtils</span><span class="o">.</span><span class="n">isEncryptionEnabled</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="p">)</span> |
| <span class="n">os</span><span class="o">.</span><span class="n">environ</span><span class="p">[</span><span class="s2">"SPARK_AUTH_SOCKET_TIMEOUT"</span><span class="p">]</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonUtils</span><span class="o">.</span><span class="n">getPythonAuthSocketTimeout</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="p">)</span> |
| <span class="p">)</span> |
| <span class="n">os</span><span class="o">.</span><span class="n">environ</span><span class="p">[</span><span class="s2">"SPARK_BUFFER_SIZE"</span><span class="p">]</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonUtils</span><span class="o">.</span><span class="n">getSparkBufferSize</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="p">))</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">pythonExec</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">environ</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"PYSPARK_PYTHON"</span><span class="p">,</span> <span class="s2">"python3"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pythonVer</span> <span class="o">=</span> <span class="s2">"</span><span class="si">%d</span><span class="s2">.</span><span class="si">%d</span><span class="s2">"</span> <span class="o">%</span> <span class="n">sys</span><span class="o">.</span><span class="n">version_info</span><span class="p">[:</span><span class="mi">2</span><span class="p">]</span> |
| |
| <span class="c1"># Broadcast's __reduce__ method stores Broadcast instances here.</span> |
| <span class="c1"># This allows other code to determine which Broadcast instances have</span> |
| <span class="c1"># been pickled, so it can determine which Java broadcast objects to</span> |
| <span class="c1"># send.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_pickled_broadcast_vars</span> <span class="o">=</span> <span class="n">BroadcastPickleRegistry</span><span class="p">()</span> |
| |
| <span class="n">SparkFiles</span><span class="o">.</span><span class="n">_sc</span> <span class="o">=</span> <span class="bp">self</span> |
| <span class="n">root_dir</span> <span class="o">=</span> <span class="n">SparkFiles</span><span class="o">.</span><span class="n">getRootDirectory</span><span class="p">()</span> |
| <span class="n">sys</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">insert</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="n">root_dir</span><span class="p">)</span> |
| |
| <span class="c1"># Deploy any code dependencies specified in the constructor</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_python_includes</span> <span class="o">=</span> <span class="nb">list</span><span class="p">()</span> |
| <span class="k">for</span> <span class="n">path</span> <span class="ow">in</span> <span class="n">pyFiles</span> <span class="ow">or</span> <span class="p">[]:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">addPyFile</span><span class="p">(</span><span class="n">path</span><span class="p">)</span> |
| |
| <span class="c1"># Deploy code dependencies set by spark-submit; these will already have been added</span> |
| <span class="c1"># with SparkContext.addFile, so we just need to add them to the PYTHONPATH</span> |
| <span class="k">for</span> <span class="n">path</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"spark.submit.pyFiles"</span><span class="p">,</span> <span class="s2">""</span><span class="p">)</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s2">","</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">path</span> <span class="o">!=</span> <span class="s2">""</span><span class="p">:</span> |
| <span class="p">(</span><span class="n">dirname</span><span class="p">,</span> <span class="n">filename</span><span class="p">)</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="n">path</span><span class="p">)</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">filepath</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">SparkFiles</span><span class="o">.</span><span class="n">getRootDirectory</span><span class="p">(),</span> <span class="n">filename</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">exists</span><span class="p">(</span><span class="n">filepath</span><span class="p">):</span> |
| <span class="c1"># In case of YARN with shell mode, 'spark.submit.pyFiles' files are</span> |
| <span class="c1"># not added via SparkContext.addFile. Here we check if the file exists,</span> |
| <span class="c1"># try to copy and then add it to the path. See SPARK-21945.</span> |
| <span class="n">shutil</span><span class="o">.</span><span class="n">copyfile</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">filename</span><span class="p">[</span><span class="o">-</span><span class="mi">4</span><span class="p">:]</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">PACKAGE_EXTENSIONS</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_python_includes</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">filename</span><span class="p">)</span> |
| <span class="n">sys</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">insert</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span> |
| <span class="n">warnings</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span> |
| <span class="s2">"Failed to add file [</span><span class="si">%s</span><span class="s2">] specified in 'spark.submit.pyFiles' to "</span> |
| <span class="s2">"Python path:</span><span class="se">\n</span><span class="s2"> </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="s2">"</span><span class="se">\n</span><span class="s2"> "</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">sys</span><span class="o">.</span><span class="n">path</span><span class="p">)),</span> |
| <span class="ne">RuntimeWarning</span><span class="p">,</span> |
| <span class="p">)</span> |
| |
| <span class="c1"># Create a temporary directory inside spark.local.dir:</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">local_dir</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">util</span><span class="o">.</span><span class="n">Utils</span><span class="o">.</span><span class="n">getLocalDir</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">conf</span><span class="p">())</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_temp_dir</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">util</span><span class="o">.</span><span class="n">Utils</span><span class="o">.</span><span class="n">createTempDir</span><span class="p">(</span> |
| <span class="n">local_dir</span><span class="p">,</span> <span class="s2">"pyspark"</span> |
| <span class="p">)</span><span class="o">.</span><span class="n">getAbsolutePath</span><span class="p">()</span> |
| |
| <span class="c1"># profiling stats collected for each PythonRDD</span> |
| <span class="k">if</span> <span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"spark.python.profile"</span><span class="p">,</span> <span class="s2">"false"</span><span class="p">)</span> <span class="o">==</span> <span class="s2">"true"</span> |
| <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"spark.python.profile.memory"</span><span class="p">,</span> <span class="s2">"false"</span><span class="p">)</span> <span class="o">==</span> <span class="s2">"true"</span> |
| <span class="p">):</span> |
| <span class="n">dump_path</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"spark.python.profile.dump"</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">profiler_collector</span> <span class="o">=</span> <span class="n">ProfilerCollector</span><span class="p">(</span> |
| <span class="n">profiler_cls</span><span class="p">,</span> <span class="n">udf_profiler_cls</span><span class="p">,</span> <span class="n">memory_profiler_cls</span><span class="p">,</span> <span class="n">dump_path</span> |
| <span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">profiler_collector</span> <span class="o">=</span> <span class="kc">None</span> <span class="c1"># type: ignore[assignment]</span> |
| |
| <span class="c1"># create a signal handler which would be invoked on receiving SIGINT</span> |
| <span class="k">def</span> <span class="nf">signal_handler</span><span class="p">(</span><span class="n">signal</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="n">frame</span><span class="p">:</span> <span class="n">Any</span><span class="p">)</span> <span class="o">-></span> <span class="n">NoReturn</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">cancelAllJobs</span><span class="p">()</span> |
| <span class="k">raise</span> <span class="ne">KeyboardInterrupt</span><span class="p">()</span> |
| |
| <span class="c1"># see http://stackoverflow.com/questions/23206787/</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span> |
| <span class="n">threading</span><span class="o">.</span><span class="n">current_thread</span><span class="p">(),</span> <span class="n">threading</span><span class="o">.</span><span class="n">_MainThread</span> <span class="c1"># type: ignore[attr-defined]</span> |
| <span class="p">):</span> |
| <span class="n">signal</span><span class="o">.</span><span class="n">signal</span><span class="p">(</span><span class="n">signal</span><span class="o">.</span><span class="n">SIGINT</span><span class="p">,</span> <span class="n">signal_handler</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="fm">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="nb">str</span><span class="p">:</span> |
| <span class="k">return</span> <span class="s2">"<SparkContext master=</span><span class="si">{master}</span><span class="s2"> appName=</span><span class="si">{appName}</span><span class="s2">>"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">master</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">master</span><span class="p">,</span> |
| <span class="n">appName</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">appName</span><span class="p">,</span> |
| <span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">_repr_html_</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="nb">str</span><span class="p">:</span> |
| <span class="k">return</span> <span class="s2">"""</span> |
| <span class="s2"> <div></span> |
| <span class="s2"> <p><b>SparkContext</b></p></span> |
| |
| <span class="s2"> <p><a href="</span><span class="si">{sc.uiWebUrl}</span><span class="s2">">Spark UI</a></p></span> |
| |
| <span class="s2"> <dl></span> |
| <span class="s2"> <dt>Version</dt></span> |
| <span class="s2"> <dd><code>v</span><span class="si">{sc.version}</span><span class="s2"></code></dd></span> |
| <span class="s2"> <dt>Master</dt></span> |
| <span class="s2"> <dd><code></span><span class="si">{sc.master}</span><span class="s2"></code></dd></span> |
| <span class="s2"> <dt>AppName</dt></span> |
| <span class="s2"> <dd><code></span><span class="si">{sc.appName}</span><span class="s2"></code></dd></span> |
| <span class="s2"> </dl></span> |
| <span class="s2"> </div></span> |
| <span class="s2"> """</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">sc</span><span class="o">=</span><span class="bp">self</span> |
| <span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">_initialize_context</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">jconf</span><span class="p">:</span> <span class="n">JavaObject</span><span class="p">)</span> <span class="o">-></span> <span class="n">JavaObject</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Initialize SparkContext in function to allow subclass specific initialization</span> |
| <span class="sd"> """</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">JavaSparkContext</span><span class="p">(</span><span class="n">jconf</span><span class="p">)</span> |
| |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_ensure_initialized</span><span class="p">(</span> |
| <span class="bp">cls</span><span class="p">,</span> |
| <span class="n">instance</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="s2">"SparkContext"</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">gateway</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">JavaGateway</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">conf</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">SparkConf</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Checks whether a SparkContext is initialized or not.</span> |
| <span class="sd"> Throws error if a SparkContext is already running.</span> |
| <span class="sd"> """</span> |
| <span class="k">with</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_lock</span><span class="p">:</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_gateway</span><span class="p">:</span> |
| <span class="n">SparkContext</span><span class="o">.</span><span class="n">_gateway</span> <span class="o">=</span> <span class="n">gateway</span> <span class="ow">or</span> <span class="n">launch_gateway</span><span class="p">(</span><span class="n">conf</span><span class="p">)</span> |
| <span class="n">SparkContext</span><span class="o">.</span><span class="n">_jvm</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_gateway</span><span class="o">.</span><span class="n">jvm</span> |
| |
| <span class="k">if</span> <span class="n">instance</span><span class="p">:</span> |
| <span class="k">if</span> <span class="p">(</span> |
| <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span> |
| <span class="ow">and</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span> <span class="o">!=</span> <span class="n">instance</span> |
| <span class="p">):</span> |
| <span class="n">currentMaster</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span><span class="o">.</span><span class="n">master</span> |
| <span class="n">currentAppName</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span><span class="o">.</span><span class="n">appName</span> |
| <span class="n">callsite</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span><span class="o">.</span><span class="n">_callsite</span> |
| |
| <span class="c1"># Raise error if there is already a running Spark context</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"Cannot run multiple SparkContexts at once; "</span> |
| <span class="s2">"existing SparkContext(app=</span><span class="si">%s</span><span class="s2">, master=</span><span class="si">%s</span><span class="s2">)"</span> |
| <span class="s2">" created by </span><span class="si">%s</span><span class="s2"> at </span><span class="si">%s</span><span class="s2">:</span><span class="si">%s</span><span class="s2"> "</span> |
| <span class="o">%</span> <span class="p">(</span> |
| <span class="n">currentAppName</span><span class="p">,</span> |
| <span class="n">currentMaster</span><span class="p">,</span> |
| <span class="n">callsite</span><span class="o">.</span><span class="n">function</span><span class="p">,</span> |
| <span class="n">callsite</span><span class="o">.</span><span class="n">file</span><span class="p">,</span> |
| <span class="n">callsite</span><span class="o">.</span><span class="n">linenum</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span> <span class="o">=</span> <span class="n">instance</span> |
| |
| <span class="k">def</span> <span class="nf">__getnewargs__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">NoReturn</span><span class="p">:</span> |
| <span class="c1"># This method is called when attempting to pickle SparkContext, which is always an error:</span> |
| <span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span> |
| <span class="n">error_class</span><span class="o">=</span><span class="s2">"CONTEXT_ONLY_VALID_ON_DRIVER"</span><span class="p">,</span> |
| <span class="n">message_parameters</span><span class="o">=</span><span class="p">{},</span> |
| <span class="p">)</span> |
| |
| <span class="k">def</span> <span class="fm">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"SparkContext"</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Enable 'with SparkContext(...) as sc: app(sc)' syntax.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span> |
| |
| <span class="k">def</span> <span class="fm">__exit__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="nb">type</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Type</span><span class="p">[</span><span class="ne">BaseException</span><span class="p">]],</span> |
| <span class="n">value</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="ne">BaseException</span><span class="p">],</span> |
| <span class="n">trace</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">TracebackType</span><span class="p">],</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Enable 'with SparkContext(...) as sc: app' syntax.</span> |
| |
| <span class="sd"> Specifically stop the context on exit of the with block.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="SparkContext.getOrCreate"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.getOrCreate.html#pyspark.SparkContext.getOrCreate">[docs]</a> <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">getOrCreate</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">conf</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">SparkConf</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"SparkContext"</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Get or instantiate a :class:`SparkContext` and register it as a singleton object.</span> |
| |
| <span class="sd"> .. versionadded:: 1.4.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> conf : :class:`SparkConf`, optional</span> |
| <span class="sd"> :class:`SparkConf` that will be used for initialization of the :class:`SparkContext`.</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`SparkContext`</span> |
| <span class="sd"> current :class:`SparkContext`, or a new one if it wasn't created before the function</span> |
| <span class="sd"> call.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> SparkContext.getOrCreate()</span> |
| <span class="sd"> <SparkContext ...></span> |
| <span class="sd"> """</span> |
| <span class="k">with</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_lock</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">SparkContext</span><span class="p">(</span><span class="n">conf</span><span class="o">=</span><span class="n">conf</span> <span class="ow">or</span> <span class="n">SparkConf</span><span class="p">())</span> |
| <span class="k">assert</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="k">return</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.setLogLevel"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.setLogLevel.html#pyspark.SparkContext.setLogLevel">[docs]</a> <span class="k">def</span> <span class="nf">setLogLevel</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">logLevel</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Control our logLevel. This overrides any user-defined log settings.</span> |
| <span class="sd"> Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN</span> |
| |
| <span class="sd"> .. versionadded:: 1.4.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> logLevel : str</span> |
| <span class="sd"> The desired log level as a string.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sc.setLogLevel("WARN") # doctest :+SKIP</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">setLogLevel</span><span class="p">(</span><span class="n">logLevel</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.setSystemProperty"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.setSystemProperty.html#pyspark.SparkContext.setSystemProperty">[docs]</a> <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">setSystemProperty</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">key</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Set a Java system property, such as `spark.executor.memory`. This must</span> |
| <span class="sd"> be invoked before instantiating :class:`SparkContext`.</span> |
| |
| <span class="sd"> .. versionadded:: 0.9.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> key : str</span> |
| <span class="sd"> The key of a new Java system property.</span> |
| <span class="sd"> value : str</span> |
| <span class="sd"> The value of a new Java system property.</span> |
| <span class="sd"> """</span> |
| <span class="n">SparkContext</span><span class="o">.</span><span class="n">_ensure_initialized</span><span class="p">()</span> |
| <span class="k">assert</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">SparkContext</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">java</span><span class="o">.</span><span class="n">lang</span><span class="o">.</span><span class="n">System</span><span class="o">.</span><span class="n">setProperty</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span></div> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">version</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="nb">str</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> The version of Spark on which this application is running.</span> |
| |
| <span class="sd"> .. versionadded:: 1.1.0</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> _ = sc.version</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">version</span><span class="p">()</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">applicationId</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="nb">str</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> A unique identifier for the Spark application.</span> |
| <span class="sd"> Its format depends on the scheduler implementation.</span> |
| |
| <span class="sd"> * in case of local spark app something like 'local-1433865536131'</span> |
| <span class="sd"> * in case of YARN something like 'application_1433865536131_34483'</span> |
| |
| <span class="sd"> .. versionadded:: 1.5.0</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sc.applicationId # doctest: +ELLIPSIS</span> |
| <span class="sd"> 'local-...'</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">applicationId</span><span class="p">()</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">uiWebUrl</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""Return the URL of the SparkUI instance started by this :class:`SparkContext`</span> |
| |
| <span class="sd"> .. versionadded:: 2.1.0</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> When the web ui is disabled, e.g., by ``spark.ui.enabled`` set to ``False``,</span> |
| <span class="sd"> it returns ``None``.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sc.uiWebUrl</span> |
| <span class="sd"> 'http://...'</span> |
| <span class="sd"> """</span> |
| <span class="n">jurl</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">uiWebUrl</span><span class="p">()</span> |
| <span class="k">return</span> <span class="n">jurl</span><span class="o">.</span><span class="n">get</span><span class="p">()</span> <span class="k">if</span> <span class="n">jurl</span><span class="o">.</span><span class="n">nonEmpty</span><span class="p">()</span> <span class="k">else</span> <span class="kc">None</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">startTime</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="nb">int</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""Return the epoch time when the :class:`SparkContext` was started.</span> |
| |
| <span class="sd"> .. versionadded:: 1.5.0</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> _ = sc.startTime</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">startTime</span><span class="p">()</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">defaultParallelism</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="nb">int</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Default level of parallelism to use when not given by user (e.g. for reduce tasks)</span> |
| |
| <span class="sd"> .. versionadded:: 0.7.0</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sc.defaultParallelism > 0</span> |
| <span class="sd"> True</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">defaultParallelism</span><span class="p">()</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">defaultMinPartitions</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="nb">int</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Default min number of partitions for Hadoop RDDs when not given by user</span> |
| |
| <span class="sd"> .. versionadded:: 1.1.0</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sc.defaultMinPartitions > 0</span> |
| <span class="sd"> True</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">defaultMinPartitions</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="SparkContext.stop"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.stop.html#pyspark.SparkContext.stop">[docs]</a> <span class="k">def</span> <span class="nf">stop</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Shut down the :class:`SparkContext`.</span> |
| |
| <span class="sd"> .. versionadded:: 0.7.0</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s2">"_jsc"</span><span class="p">,</span> <span class="kc">None</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span> |
| <span class="k">except</span> <span class="n">Py4JError</span><span class="p">:</span> |
| <span class="c1"># Case: SPARK-18523</span> |
| <span class="n">warnings</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span> |
| <span class="s2">"Unable to cleanly shutdown Spark JVM process."</span> |
| <span class="s2">" It is possible that the process has crashed,"</span> |
| <span class="s2">" been killed or may also be in a zombie state."</span><span class="p">,</span> |
| <span class="ne">RuntimeWarning</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="k">finally</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">if</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s2">"_accumulatorServer"</span><span class="p">,</span> <span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_accumulatorServer</span><span class="o">.</span><span class="n">shutdown</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_accumulatorServer</span> <span class="o">=</span> <span class="kc">None</span> <span class="c1"># type: ignore[assignment]</span> |
| <span class="k">with</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_lock</span><span class="p">:</span> |
| <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span> <span class="o">=</span> <span class="kc">None</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.emptyRDD"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.emptyRDD.html#pyspark.SparkContext.emptyRDD">[docs]</a> <span class="k">def</span> <span class="nf">emptyRDD</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="n">Any</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Create an :class:`RDD` that has no partitions or elements.</span> |
| |
| <span class="sd"> .. versionadded:: 1.5.0</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`RDD`</span> |
| <span class="sd"> An empty RDD</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sc.emptyRDD()</span> |
| <span class="sd"> EmptyRDD...</span> |
| <span class="sd"> >>> sc.emptyRDD().count()</span> |
| <span class="sd"> 0</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">emptyRDD</span><span class="p">(),</span> <span class="bp">self</span><span class="p">,</span> <span class="n">NoOpSerializer</span><span class="p">())</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.range"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.range.html#pyspark.SparkContext.range">[docs]</a> <span class="k">def</span> <span class="nf">range</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="n">start</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> <span class="n">end</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> <span class="n">step</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">1</span><span class="p">,</span> <span class="n">numSlices</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="nb">int</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Create a new RDD of int containing elements from `start` to `end`</span> |
| <span class="sd"> (exclusive), increased by `step` every element. Can be called the same</span> |
| <span class="sd"> way as python's built-in range() function. If called with a single argument,</span> |
| <span class="sd"> the argument is interpreted as `end`, and `start` is set to 0.</span> |
| |
| <span class="sd"> .. versionadded:: 1.5.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> start : int</span> |
| <span class="sd"> the start value</span> |
| <span class="sd"> end : int, optional</span> |
| <span class="sd"> the end value (exclusive)</span> |
| <span class="sd"> step : int, optional, default 1</span> |
| <span class="sd"> the incremental step</span> |
| <span class="sd"> numSlices : int, optional</span> |
| <span class="sd"> the number of partitions of the new RDD</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`RDD`</span> |
| <span class="sd"> An RDD of int</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`pyspark.sql.SparkSession.range`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sc.range(5).collect()</span> |
| <span class="sd"> [0, 1, 2, 3, 4]</span> |
| <span class="sd"> >>> sc.range(2, 4).collect()</span> |
| <span class="sd"> [2, 3]</span> |
| <span class="sd"> >>> sc.range(1, 7, 2).collect()</span> |
| <span class="sd"> [1, 3, 5]</span> |
| |
| <span class="sd"> Generate RDD with a negative step</span> |
| |
| <span class="sd"> >>> sc.range(5, 0, -1).collect()</span> |
| <span class="sd"> [5, 4, 3, 2, 1]</span> |
| <span class="sd"> >>> sc.range(0, 5, -1).collect()</span> |
| <span class="sd"> []</span> |
| |
| <span class="sd"> Control the number of partitions</span> |
| |
| <span class="sd"> >>> sc.range(5, numSlices=1).getNumPartitions()</span> |
| <span class="sd"> 1</span> |
| <span class="sd"> >>> sc.range(5, numSlices=10).getNumPartitions()</span> |
| <span class="sd"> 10</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">end</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">end</span> <span class="o">=</span> <span class="n">start</span> |
| <span class="n">start</span> <span class="o">=</span> <span class="mi">0</span> |
| |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="nb">range</span><span class="p">(</span><span class="n">start</span><span class="p">,</span> <span class="n">end</span><span class="p">,</span> <span class="n">step</span><span class="p">),</span> <span class="n">numSlices</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.parallelize"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.parallelize.html#pyspark.SparkContext.parallelize">[docs]</a> <span class="k">def</span> <span class="nf">parallelize</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">c</span><span class="p">:</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">T</span><span class="p">],</span> <span class="n">numSlices</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Distribute a local Python collection to form an RDD. Using range</span> |
| <span class="sd"> is recommended if the input represents a range for performance.</span> |
| |
| <span class="sd"> .. versionadded:: 0.7.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> c : :class:`collections.abc.Iterable`</span> |
| <span class="sd"> iterable collection to distribute</span> |
| <span class="sd"> numSlices : int, optional</span> |
| <span class="sd"> the number of partitions of the new RDD</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`RDD`</span> |
| <span class="sd"> RDD representing distributed collection.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()</span> |
| <span class="sd"> [[0], [2], [3], [4], [6]]</span> |
| <span class="sd"> >>> sc.parallelize(range(0, 6, 2), 5).glom().collect()</span> |
| <span class="sd"> [[], [0], [], [2], [4]]</span> |
| |
| <span class="sd"> Deal with a list of strings.</span> |
| |
| <span class="sd"> >>> strings = ["a", "b", "c"]</span> |
| <span class="sd"> >>> sc.parallelize(strings, 2).glom().collect()</span> |
| <span class="sd"> [['a'], ['b', 'c']]</span> |
| <span class="sd"> """</span> |
| <span class="n">numSlices</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">numSlices</span><span class="p">)</span> <span class="k">if</span> <span class="n">numSlices</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">defaultParallelism</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="nb">range</span><span class="p">):</span> |
| <span class="n">size</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">c</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">size</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">parallelize</span><span class="p">([],</span> <span class="n">numSlices</span><span class="p">)</span> |
| <span class="n">step</span> <span class="o">=</span> <span class="n">c</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> <span class="o">-</span> <span class="n">c</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="k">if</span> <span class="n">size</span> <span class="o">></span> <span class="mi">1</span> <span class="k">else</span> <span class="mi">1</span> <span class="c1"># type: ignore[index]</span> |
| <span class="n">start0</span> <span class="o">=</span> <span class="n">c</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="c1"># type: ignore[index]</span> |
| |
| <span class="k">def</span> <span class="nf">getStart</span><span class="p">(</span><span class="n">split</span><span class="p">:</span> <span class="nb">int</span><span class="p">)</span> <span class="o">-></span> <span class="nb">int</span><span class="p">:</span> |
| <span class="k">assert</span> <span class="n">numSlices</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="k">return</span> <span class="n">start0</span> <span class="o">+</span> <span class="nb">int</span><span class="p">((</span><span class="n">split</span> <span class="o">*</span> <span class="n">size</span> <span class="o">/</span> <span class="n">numSlices</span><span class="p">))</span> <span class="o">*</span> <span class="n">step</span> |
| |
| <span class="k">def</span> <span class="nf">f</span><span class="p">(</span><span class="n">split</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> <span class="n">iterator</span><span class="p">:</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">T</span><span class="p">])</span> <span class="o">-></span> <span class="n">Iterable</span><span class="p">:</span> |
| <span class="c1"># it's an empty iterator here but we need this line for triggering the</span> |
| <span class="c1"># logic of signal handling in FramedSerializer.load_stream, for instance,</span> |
| <span class="c1"># SpecialLengths.END_OF_DATA_SECTION in _read_with_length. Since</span> |
| <span class="c1"># FramedSerializer.load_stream produces a generator, the control should</span> |
| <span class="c1"># at least be in that function once. Here we do it by explicitly converting</span> |
| <span class="c1"># the empty iterator to a list, thus make sure worker reuse takes effect.</span> |
| <span class="c1"># See more details in SPARK-26549.</span> |
| <span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="nb">list</span><span class="p">(</span><span class="n">iterator</span><span class="p">))</span> <span class="o">==</span> <span class="mi">0</span> |
| <span class="k">return</span> <span class="nb">range</span><span class="p">(</span><span class="n">getStart</span><span class="p">(</span><span class="n">split</span><span class="p">),</span> <span class="n">getStart</span><span class="p">(</span><span class="n">split</span> <span class="o">+</span> <span class="mi">1</span><span class="p">),</span> <span class="n">step</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">parallelize</span><span class="p">([],</span> <span class="n">numSlices</span><span class="p">)</span><span class="o">.</span><span class="n">mapPartitionsWithIndex</span><span class="p">(</span><span class="n">f</span><span class="p">)</span> |
| |
| <span class="c1"># Make sure we distribute data evenly if it's smaller than self.batchSize</span> |
| <span class="k">if</span> <span class="s2">"__len__"</span> <span class="ow">not</span> <span class="ow">in</span> <span class="nb">dir</span><span class="p">(</span><span class="n">c</span><span class="p">):</span> |
| <span class="n">c</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">c</span><span class="p">)</span> <span class="c1"># Make it a list so we can compute its length</span> |
| <span class="n">batchSize</span> <span class="o">=</span> <span class="nb">max</span><span class="p">(</span> |
| <span class="mi">1</span><span class="p">,</span> <span class="nb">min</span><span class="p">(</span><span class="nb">len</span><span class="p">(</span><span class="n">c</span><span class="p">)</span> <span class="o">//</span> <span class="n">numSlices</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_batchSize</span> <span class="ow">or</span> <span class="mi">1024</span><span class="p">)</span> <span class="c1"># type: ignore[arg-type]</span> |
| <span class="p">)</span> |
| <span class="n">serializer</span> <span class="o">=</span> <span class="n">BatchedSerializer</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_unbatched_serializer</span><span class="p">,</span> <span class="n">batchSize</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">reader_func</span><span class="p">(</span><span class="n">temp_filename</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="n">JavaObject</span><span class="p">:</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonRDD</span><span class="o">.</span><span class="n">readRDDFromFile</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="p">,</span> <span class="n">temp_filename</span><span class="p">,</span> <span class="n">numSlices</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">createRDDServer</span><span class="p">()</span> <span class="o">-></span> <span class="n">JavaObject</span><span class="p">:</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonParallelizeServer</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">(),</span> <span class="n">numSlices</span><span class="p">)</span> |
| |
| <span class="n">jrdd</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_serialize_to_jvm</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="n">serializer</span><span class="p">,</span> <span class="n">reader_func</span><span class="p">,</span> <span class="n">createRDDServer</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span><span class="n">jrdd</span><span class="p">,</span> <span class="bp">self</span><span class="p">,</span> <span class="n">serializer</span><span class="p">)</span></div> |
| |
| <span class="k">def</span> <span class="nf">_serialize_to_jvm</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">data</span><span class="p">:</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">T</span><span class="p">],</span> |
| <span class="n">serializer</span><span class="p">:</span> <span class="n">Serializer</span><span class="p">,</span> |
| <span class="n">reader_func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">,</span> |
| <span class="n">server_func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">JavaObject</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Using Py4J to send a large dataset to the jvm is slow, so we use either a file</span> |
| <span class="sd"> or a socket if we have encryption enabled.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> data</span> |
| <span class="sd"> object to be serialized</span> |
| <span class="sd"> serializer : class:`pyspark.serializers.Serializer`</span> |
| <span class="sd"> reader_func : function</span> |
| <span class="sd"> A function which takes a filename and reads in the data in the jvm and</span> |
| <span class="sd"> returns a JavaRDD. Only used when encryption is disabled.</span> |
| <span class="sd"> server_func : function</span> |
| <span class="sd"> A function which creates a SocketAuthServer in the JVM to</span> |
| <span class="sd"> accept the serialized data, for use when encryption is enabled.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_encryption_enabled</span><span class="p">:</span> |
| <span class="c1"># with encryption, we open a server in java and send the data directly</span> |
| <span class="n">server</span> <span class="o">=</span> <span class="n">server_func</span><span class="p">()</span> |
| <span class="p">(</span><span class="n">sock_file</span><span class="p">,</span> <span class="n">_</span><span class="p">)</span> <span class="o">=</span> <span class="n">local_connect_and_auth</span><span class="p">(</span><span class="n">server</span><span class="o">.</span><span class="n">port</span><span class="p">(),</span> <span class="n">server</span><span class="o">.</span><span class="n">secret</span><span class="p">())</span> |
| <span class="n">chunked_out</span> <span class="o">=</span> <span class="n">ChunkedStream</span><span class="p">(</span><span class="n">sock_file</span><span class="p">,</span> <span class="mi">8192</span><span class="p">)</span> |
| <span class="n">serializer</span><span class="o">.</span><span class="n">dump_stream</span><span class="p">(</span><span class="n">data</span><span class="p">,</span> <span class="n">chunked_out</span><span class="p">)</span> |
| <span class="n">chunked_out</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="c1"># this call will block until the server has read all the data and processed it (or</span> |
| <span class="c1"># throws an exception)</span> |
| <span class="n">r</span> <span class="o">=</span> <span class="n">server</span><span class="o">.</span><span class="n">getResult</span><span class="p">()</span> |
| <span class="k">return</span> <span class="n">r</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># without encryption, we serialize to a file, and we read the file in java and</span> |
| <span class="c1"># parallelize from there.</span> |
| <span class="n">tempFile</span> <span class="o">=</span> <span class="n">NamedTemporaryFile</span><span class="p">(</span><span class="n">delete</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="nb">dir</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_temp_dir</span><span class="p">)</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">serializer</span><span class="o">.</span><span class="n">dump_stream</span><span class="p">(</span><span class="n">data</span><span class="p">,</span> <span class="n">tempFile</span><span class="p">)</span> |
| <span class="k">finally</span><span class="p">:</span> |
| <span class="n">tempFile</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="k">return</span> <span class="n">reader_func</span><span class="p">(</span><span class="n">tempFile</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| <span class="k">finally</span><span class="p">:</span> |
| <span class="c1"># we eagerly reads the file so we can delete right after.</span> |
| <span class="n">os</span><span class="o">.</span><span class="n">unlink</span><span class="p">(</span><span class="n">tempFile</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="SparkContext.pickleFile"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.pickleFile.html#pyspark.SparkContext.pickleFile">[docs]</a> <span class="k">def</span> <span class="nf">pickleFile</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">minPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="n">Any</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Load an RDD previously saved using :meth:`RDD.saveAsPickleFile` method.</span> |
| |
| <span class="sd"> .. versionadded:: 1.1.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> name : str</span> |
| <span class="sd"> directory to the input data files, the path can be comma separated</span> |
| <span class="sd"> paths as a list of inputs</span> |
| <span class="sd"> minPartitions : int, optional</span> |
| <span class="sd"> suggested minimum number of partitions for the resulting RDD</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`RDD`</span> |
| <span class="sd"> RDD representing unpickled data from the file(s).</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`RDD.saveAsPickleFile`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import os</span> |
| <span class="sd"> >>> import tempfile</span> |
| <span class="sd"> >>> with tempfile.TemporaryDirectory() as d:</span> |
| <span class="sd"> ... # Write a temporary pickled file</span> |
| <span class="sd"> ... path1 = os.path.join(d, "pickled1")</span> |
| <span class="sd"> ... sc.parallelize(range(10)).saveAsPickleFile(path1, 3)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Write another temporary pickled file</span> |
| <span class="sd"> ... path2 = os.path.join(d, "pickled2")</span> |
| <span class="sd"> ... sc.parallelize(range(-10, -5)).saveAsPickleFile(path2, 3)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Load picked file</span> |
| <span class="sd"> ... collected1 = sorted(sc.pickleFile(path1, 3).collect())</span> |
| <span class="sd"> ... collected2 = sorted(sc.pickleFile(path2, 4).collect())</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Load two picked files together</span> |
| <span class="sd"> ... collected3 = sorted(sc.pickleFile('{},{}'.format(path1, path2), 5).collect())</span> |
| |
| <span class="sd"> >>> collected1</span> |
| <span class="sd"> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]</span> |
| <span class="sd"> >>> collected2</span> |
| <span class="sd"> [-10, -9, -8, -7, -6]</span> |
| <span class="sd"> >>> collected3</span> |
| <span class="sd"> [-10, -9, -8, -7, -6, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]</span> |
| <span class="sd"> """</span> |
| <span class="n">minPartitions</span> <span class="o">=</span> <span class="n">minPartitions</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">defaultMinPartitions</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">objectFile</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">minPartitions</span><span class="p">),</span> <span class="bp">self</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.textFile"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.textFile.html#pyspark.SparkContext.textFile">[docs]</a> <span class="k">def</span> <span class="nf">textFile</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">minPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> <span class="n">use_unicode</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="nb">str</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Read a text file from HDFS, a local file system (available on all</span> |
| <span class="sd"> nodes), or any Hadoop-supported file system URI, and return it as an</span> |
| <span class="sd"> RDD of Strings. The text files must be encoded as UTF-8.</span> |
| |
| <span class="sd"> .. versionadded:: 0.7.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> name : str</span> |
| <span class="sd"> directory to the input data files, the path can be comma separated</span> |
| <span class="sd"> paths as a list of inputs</span> |
| <span class="sd"> minPartitions : int, optional</span> |
| <span class="sd"> suggested minimum number of partitions for the resulting RDD</span> |
| <span class="sd"> use_unicode : bool, default True</span> |
| <span class="sd"> If `use_unicode` is False, the strings will be kept as `str` (encoding</span> |
| <span class="sd"> as `utf-8`), which is faster and smaller than unicode.</span> |
| |
| <span class="sd"> .. versionadded:: 1.2.0</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`RDD`</span> |
| <span class="sd"> RDD representing text data from the file(s).</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`RDD.saveAsTextFile`</span> |
| <span class="sd"> :meth:`SparkContext.wholeTextFiles`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import os</span> |
| <span class="sd"> >>> import tempfile</span> |
| <span class="sd"> >>> with tempfile.TemporaryDirectory() as d:</span> |
| <span class="sd"> ... path1 = os.path.join(d, "text1")</span> |
| <span class="sd"> ... path2 = os.path.join(d, "text2")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Write a temporary text file</span> |
| <span class="sd"> ... sc.parallelize(["x", "y", "z"]).saveAsTextFile(path1)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Write another temporary text file</span> |
| <span class="sd"> ... sc.parallelize(["aa", "bb", "cc"]).saveAsTextFile(path2)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Load text file</span> |
| <span class="sd"> ... collected1 = sorted(sc.textFile(path1, 3).collect())</span> |
| <span class="sd"> ... collected2 = sorted(sc.textFile(path2, 4).collect())</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Load two text files together</span> |
| <span class="sd"> ... collected3 = sorted(sc.textFile('{},{}'.format(path1, path2), 5).collect())</span> |
| |
| <span class="sd"> >>> collected1</span> |
| <span class="sd"> ['x', 'y', 'z']</span> |
| <span class="sd"> >>> collected2</span> |
| <span class="sd"> ['aa', 'bb', 'cc']</span> |
| <span class="sd"> >>> collected3</span> |
| <span class="sd"> ['aa', 'bb', 'cc', 'x', 'y', 'z']</span> |
| <span class="sd"> """</span> |
| <span class="n">minPartitions</span> <span class="o">=</span> <span class="n">minPartitions</span> <span class="ow">or</span> <span class="nb">min</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">defaultParallelism</span><span class="p">,</span> <span class="mi">2</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">minPartitions</span><span class="p">),</span> <span class="bp">self</span><span class="p">,</span> <span class="n">UTF8Deserializer</span><span class="p">(</span><span class="n">use_unicode</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.wholeTextFiles"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.wholeTextFiles.html#pyspark.SparkContext.wholeTextFiles">[docs]</a> <span class="k">def</span> <span class="nf">wholeTextFiles</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">minPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> <span class="n">use_unicode</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Read a directory of text files from HDFS, a local file system</span> |
| <span class="sd"> (available on all nodes), or any Hadoop-supported file system</span> |
| <span class="sd"> URI. Each file is read as a single record and returned in a</span> |
| <span class="sd"> key-value pair, where the key is the path of each file, the</span> |
| <span class="sd"> value is the content of each file.</span> |
| <span class="sd"> The text files must be encoded as UTF-8.</span> |
| |
| <span class="sd"> .. versionadded:: 1.0.0</span> |
| |
| <span class="sd"> For example, if you have the following files:</span> |
| |
| <span class="sd"> .. code-block:: text</span> |
| |
| <span class="sd"> hdfs://a-hdfs-path/part-00000</span> |
| <span class="sd"> hdfs://a-hdfs-path/part-00001</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> hdfs://a-hdfs-path/part-nnnnn</span> |
| |
| <span class="sd"> Do ``rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")``,</span> |
| <span class="sd"> then ``rdd`` contains:</span> |
| |
| <span class="sd"> .. code-block:: text</span> |
| |
| <span class="sd"> (a-hdfs-path/part-00000, its content)</span> |
| <span class="sd"> (a-hdfs-path/part-00001, its content)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> (a-hdfs-path/part-nnnnn, its content)</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str</span> |
| <span class="sd"> directory to the input data files, the path can be comma separated</span> |
| <span class="sd"> paths as a list of inputs</span> |
| <span class="sd"> minPartitions : int, optional</span> |
| <span class="sd"> suggested minimum number of partitions for the resulting RDD</span> |
| <span class="sd"> use_unicode : bool, default True</span> |
| <span class="sd"> If `use_unicode` is False, the strings will be kept as `str` (encoding</span> |
| <span class="sd"> as `utf-8`), which is faster and smaller than unicode.</span> |
| |
| <span class="sd"> .. versionadded:: 1.2.0</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`RDD`</span> |
| <span class="sd"> RDD representing path-content pairs from the file(s).</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> Small files are preferred, as each file will be loaded fully in memory.</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`RDD.saveAsTextFile`</span> |
| <span class="sd"> :meth:`SparkContext.textFile`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import os</span> |
| <span class="sd"> >>> import tempfile</span> |
| <span class="sd"> >>> with tempfile.TemporaryDirectory() as d:</span> |
| <span class="sd"> ... # Write a temporary text file</span> |
| <span class="sd"> ... with open(os.path.join(d, "1.txt"), "w") as f:</span> |
| <span class="sd"> ... _ = f.write("123")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Write another temporary text file</span> |
| <span class="sd"> ... with open(os.path.join(d, "2.txt"), "w") as f:</span> |
| <span class="sd"> ... _ = f.write("xyz")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... collected = sorted(sc.wholeTextFiles(d).collect())</span> |
| <span class="sd"> >>> collected</span> |
| <span class="sd"> [('.../1.txt', '123'), ('.../2.txt', 'xyz')]</span> |
| <span class="sd"> """</span> |
| <span class="n">minPartitions</span> <span class="o">=</span> <span class="n">minPartitions</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">defaultMinPartitions</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">wholeTextFiles</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="n">minPartitions</span><span class="p">),</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">PairDeserializer</span><span class="p">(</span><span class="n">UTF8Deserializer</span><span class="p">(</span><span class="n">use_unicode</span><span class="p">),</span> <span class="n">UTF8Deserializer</span><span class="p">(</span><span class="n">use_unicode</span><span class="p">)),</span> |
| <span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.binaryFiles"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.binaryFiles.html#pyspark.SparkContext.binaryFiles">[docs]</a> <span class="k">def</span> <span class="nf">binaryFiles</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">minPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">bytes</span><span class="p">]]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Read a directory of binary files from HDFS, a local file system</span> |
| <span class="sd"> (available on all nodes), or any Hadoop-supported file system URI</span> |
| <span class="sd"> as a byte array. Each file is read as a single record and returned</span> |
| <span class="sd"> in a key-value pair, where the key is the path of each file, the</span> |
| <span class="sd"> value is the content of each file.</span> |
| |
| <span class="sd"> .. versionadded:: 1.3.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str</span> |
| <span class="sd"> directory to the input data files, the path can be comma separated</span> |
| <span class="sd"> paths as a list of inputs</span> |
| <span class="sd"> minPartitions : int, optional</span> |
| <span class="sd"> suggested minimum number of partitions for the resulting RDD</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`RDD`</span> |
| <span class="sd"> RDD representing path-content pairs from the file(s).</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> Small files are preferred, large file is also allowable, but may cause bad performance.</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.binaryRecords`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import os</span> |
| <span class="sd"> >>> import tempfile</span> |
| <span class="sd"> >>> with tempfile.TemporaryDirectory() as d:</span> |
| <span class="sd"> ... # Write a temporary binary file</span> |
| <span class="sd"> ... with open(os.path.join(d, "1.bin"), "wb") as f1:</span> |
| <span class="sd"> ... _ = f1.write(b"binary data I")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Write another temporary binary file</span> |
| <span class="sd"> ... with open(os.path.join(d, "2.bin"), "wb") as f2:</span> |
| <span class="sd"> ... _ = f2.write(b"binary data II")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... collected = sorted(sc.binaryFiles(d).collect())</span> |
| |
| <span class="sd"> >>> collected</span> |
| <span class="sd"> [('.../1.bin', b'binary data I'), ('.../2.bin', b'binary data II')]</span> |
| <span class="sd"> """</span> |
| <span class="n">minPartitions</span> <span class="o">=</span> <span class="n">minPartitions</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">defaultMinPartitions</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">binaryFiles</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="n">minPartitions</span><span class="p">),</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">PairDeserializer</span><span class="p">(</span><span class="n">UTF8Deserializer</span><span class="p">(),</span> <span class="n">NoOpSerializer</span><span class="p">()),</span> |
| <span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.binaryRecords"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.binaryRecords.html#pyspark.SparkContext.binaryRecords">[docs]</a> <span class="k">def</span> <span class="nf">binaryRecords</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">recordLength</span><span class="p">:</span> <span class="nb">int</span><span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="nb">bytes</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Load data from a flat binary file, assuming each record is a set of numbers</span> |
| <span class="sd"> with the specified numerical format (see ByteBuffer), and the number of</span> |
| <span class="sd"> bytes per record is constant.</span> |
| |
| <span class="sd"> .. versionadded:: 1.3.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str</span> |
| <span class="sd"> Directory to the input data files</span> |
| <span class="sd"> recordLength : int</span> |
| <span class="sd"> The length at which to split the records</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`RDD`</span> |
| <span class="sd"> RDD of data with values, represented as byte arrays</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.binaryFiles`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import os</span> |
| <span class="sd"> >>> import tempfile</span> |
| <span class="sd"> >>> with tempfile.TemporaryDirectory() as d:</span> |
| <span class="sd"> ... # Write a temporary file</span> |
| <span class="sd"> ... with open(os.path.join(d, "1.bin"), "w") as f:</span> |
| <span class="sd"> ... for i in range(3):</span> |
| <span class="sd"> ... _ = f.write("%04d" % i)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Write another file</span> |
| <span class="sd"> ... with open(os.path.join(d, "2.bin"), "w") as f:</span> |
| <span class="sd"> ... for i in [-1, -2, -10]:</span> |
| <span class="sd"> ... _ = f.write("%04d" % i)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... collected = sorted(sc.binaryRecords(d, 4).collect())</span> |
| |
| <span class="sd"> >>> collected</span> |
| <span class="sd"> [b'-001', b'-002', b'-010', b'0000', b'0001', b'0002']</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">binaryRecords</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="n">recordLength</span><span class="p">),</span> <span class="bp">self</span><span class="p">,</span> <span class="n">NoOpSerializer</span><span class="p">())</span></div> |
| |
| <span class="k">def</span> <span class="nf">_dictToJavaMap</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">d</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]])</span> <span class="o">-></span> <span class="n">JavaMap</span><span class="p">:</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">jm</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">java</span><span class="o">.</span><span class="n">util</span><span class="o">.</span><span class="n">HashMap</span><span class="p">()</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">d</span><span class="p">:</span> |
| <span class="n">d</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">d</span><span class="o">.</span><span class="n">items</span><span class="p">():</span> |
| <span class="n">jm</span><span class="p">[</span><span class="n">k</span><span class="p">]</span> <span class="o">=</span> <span class="n">v</span> |
| <span class="k">return</span> <span class="n">jm</span> |
| |
| <div class="viewcode-block" id="SparkContext.sequenceFile"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.sequenceFile.html#pyspark.SparkContext.sequenceFile">[docs]</a> <span class="k">def</span> <span class="nf">sequenceFile</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">keyClass</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">valueClass</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">keyConverter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">valueConverter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">minSplits</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">batchSize</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">0</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">T</span><span class="p">,</span> <span class="n">U</span><span class="p">]]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,</span> |
| <span class="sd"> a local file system (available on all nodes), or any Hadoop-supported file system URI.</span> |
| <span class="sd"> The mechanism is as follows:</span> |
| |
| <span class="sd"> 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key</span> |
| <span class="sd"> and value Writable classes</span> |
| <span class="sd"> 2. Serialization is attempted via Pickle pickling</span> |
| <span class="sd"> 3. If this fails, the fallback is to call 'toString' on each key and value</span> |
| <span class="sd"> 4. :class:`CPickleSerializer` is used to deserialize pickled objects on the Python side</span> |
| |
| <span class="sd"> .. versionadded:: 1.3.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str</span> |
| <span class="sd"> path to sequencefile</span> |
| <span class="sd"> keyClass: str, optional</span> |
| <span class="sd"> fully qualified classname of key Writable class (e.g. "org.apache.hadoop.io.Text")</span> |
| <span class="sd"> valueClass : str, optional</span> |
| <span class="sd"> fully qualified classname of value Writable class</span> |
| <span class="sd"> (e.g. "org.apache.hadoop.io.LongWritable")</span> |
| <span class="sd"> keyConverter : str, optional</span> |
| <span class="sd"> fully qualified name of a function returning key WritableConverter</span> |
| <span class="sd"> valueConverter : str, optional</span> |
| <span class="sd"> fully qualifiedname of a function returning value WritableConverter</span> |
| <span class="sd"> minSplits : int, optional</span> |
| <span class="sd"> minimum splits in dataset (default min(2, sc.defaultParallelism))</span> |
| <span class="sd"> batchSize : int, optional, default 0</span> |
| <span class="sd"> The number of Python objects represented as a single</span> |
| <span class="sd"> Java object. (default 0, choose batchSize automatically)</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`RDD`</span> |
| <span class="sd"> RDD of tuples of key and corresponding value</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`RDD.saveAsSequenceFile`</span> |
| <span class="sd"> :meth:`RDD.saveAsNewAPIHadoopFile`</span> |
| <span class="sd"> :meth:`RDD.saveAsHadoopFile`</span> |
| <span class="sd"> :meth:`SparkContext.newAPIHadoopFile`</span> |
| <span class="sd"> :meth:`SparkContext.hadoopFile`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import os</span> |
| <span class="sd"> >>> import tempfile</span> |
| |
| <span class="sd"> Set the class of output format</span> |
| |
| <span class="sd"> >>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"</span> |
| |
| <span class="sd"> >>> with tempfile.TemporaryDirectory() as d:</span> |
| <span class="sd"> ... path = os.path.join(d, "hadoop_file")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Write a temporary Hadoop file</span> |
| <span class="sd"> ... rdd = sc.parallelize([(1, {3.0: "bb"}), (2, {1.0: "aa"}), (3, {2.0: "dd"})])</span> |
| <span class="sd"> ... rdd.saveAsNewAPIHadoopFile(path, output_format_class)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... collected = sorted(sc.sequenceFile(path).collect())</span> |
| |
| <span class="sd"> >>> collected</span> |
| <span class="sd"> [(1, {3.0: 'bb'}), (2, {1.0: 'aa'}), (3, {2.0: 'dd'})]</span> |
| <span class="sd"> """</span> |
| <span class="n">minSplits</span> <span class="o">=</span> <span class="n">minSplits</span> <span class="ow">or</span> <span class="nb">min</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">defaultParallelism</span><span class="p">,</span> <span class="mi">2</span><span class="p">)</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">jrdd</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonRDD</span><span class="o">.</span><span class="n">sequenceFile</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">,</span> |
| <span class="n">keyClass</span><span class="p">,</span> |
| <span class="n">valueClass</span><span class="p">,</span> |
| <span class="n">keyConverter</span><span class="p">,</span> |
| <span class="n">valueConverter</span><span class="p">,</span> |
| <span class="n">minSplits</span><span class="p">,</span> |
| <span class="n">batchSize</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span><span class="n">jrdd</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.newAPIHadoopFile"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.newAPIHadoopFile.html#pyspark.SparkContext.newAPIHadoopFile">[docs]</a> <span class="k">def</span> <span class="nf">newAPIHadoopFile</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">inputFormatClass</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">keyClass</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">valueClass</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">keyConverter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">valueConverter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">conf</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">batchSize</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">0</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">T</span><span class="p">,</span> <span class="n">U</span><span class="p">]]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,</span> |
| <span class="sd"> a local file system (available on all nodes), or any Hadoop-supported file system URI.</span> |
| <span class="sd"> The mechanism is the same as for meth:`SparkContext.sequenceFile`.</span> |
| |
| <span class="sd"> A Hadoop configuration can be passed in as a Python dict. This will be converted into a</span> |
| <span class="sd"> Configuration in Java</span> |
| |
| <span class="sd"> .. versionadded:: 1.1.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str</span> |
| <span class="sd"> path to Hadoop file</span> |
| <span class="sd"> inputFormatClass : str</span> |
| <span class="sd"> fully qualified classname of Hadoop InputFormat</span> |
| <span class="sd"> (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")</span> |
| <span class="sd"> keyClass : str</span> |
| <span class="sd"> fully qualified classname of key Writable class</span> |
| <span class="sd"> (e.g. "org.apache.hadoop.io.Text")</span> |
| <span class="sd"> valueClass : str</span> |
| <span class="sd"> fully qualified classname of value Writable class</span> |
| <span class="sd"> (e.g. "org.apache.hadoop.io.LongWritable")</span> |
| <span class="sd"> keyConverter : str, optional</span> |
| <span class="sd"> fully qualified name of a function returning key WritableConverter</span> |
| <span class="sd"> None by default</span> |
| <span class="sd"> valueConverter : str, optional</span> |
| <span class="sd"> fully qualified name of a function returning value WritableConverter</span> |
| <span class="sd"> None by default</span> |
| <span class="sd"> conf : dict, optional</span> |
| <span class="sd"> Hadoop configuration, passed in as a dict</span> |
| <span class="sd"> None by default</span> |
| <span class="sd"> batchSize : int, optional, default 0</span> |
| <span class="sd"> The number of Python objects represented as a single</span> |
| <span class="sd"> Java object. (default 0, choose batchSize automatically)</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`RDD`</span> |
| <span class="sd"> RDD of tuples of key and corresponding value</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`RDD.saveAsSequenceFile`</span> |
| <span class="sd"> :meth:`RDD.saveAsNewAPIHadoopFile`</span> |
| <span class="sd"> :meth:`RDD.saveAsHadoopFile`</span> |
| <span class="sd"> :meth:`SparkContext.sequenceFile`</span> |
| <span class="sd"> :meth:`SparkContext.hadoopFile`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import os</span> |
| <span class="sd"> >>> import tempfile</span> |
| |
| <span class="sd"> Set the related classes</span> |
| |
| <span class="sd"> >>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"</span> |
| <span class="sd"> >>> input_format_class = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"</span> |
| <span class="sd"> >>> key_class = "org.apache.hadoop.io.IntWritable"</span> |
| <span class="sd"> >>> value_class = "org.apache.hadoop.io.Text"</span> |
| |
| <span class="sd"> >>> with tempfile.TemporaryDirectory() as d:</span> |
| <span class="sd"> ... path = os.path.join(d, "new_hadoop_file")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Write a temporary Hadoop file</span> |
| <span class="sd"> ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])</span> |
| <span class="sd"> ... rdd.saveAsNewAPIHadoopFile(path, output_format_class, key_class, value_class)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... loaded = sc.newAPIHadoopFile(path, input_format_class, key_class, value_class)</span> |
| <span class="sd"> ... collected = sorted(loaded.collect())</span> |
| |
| <span class="sd"> >>> collected</span> |
| <span class="sd"> [(1, ''), (1, 'a'), (3, 'x')]</span> |
| <span class="sd"> """</span> |
| <span class="n">jconf</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dictToJavaMap</span><span class="p">(</span><span class="n">conf</span><span class="p">)</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">jrdd</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonRDD</span><span class="o">.</span><span class="n">newAPIHadoopFile</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">,</span> |
| <span class="n">inputFormatClass</span><span class="p">,</span> |
| <span class="n">keyClass</span><span class="p">,</span> |
| <span class="n">valueClass</span><span class="p">,</span> |
| <span class="n">keyConverter</span><span class="p">,</span> |
| <span class="n">valueConverter</span><span class="p">,</span> |
| <span class="n">jconf</span><span class="p">,</span> |
| <span class="n">batchSize</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span><span class="n">jrdd</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.newAPIHadoopRDD"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.newAPIHadoopRDD.html#pyspark.SparkContext.newAPIHadoopRDD">[docs]</a> <span class="k">def</span> <span class="nf">newAPIHadoopRDD</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">inputFormatClass</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">keyClass</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">valueClass</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">keyConverter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">valueConverter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">conf</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">batchSize</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">0</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">T</span><span class="p">,</span> <span class="n">U</span><span class="p">]]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary</span> |
| <span class="sd"> Hadoop configuration, which is passed in as a Python dict.</span> |
| <span class="sd"> This will be converted into a Configuration in Java.</span> |
| <span class="sd"> The mechanism is the same as for meth:`SparkContext.sequenceFile`.</span> |
| |
| <span class="sd"> .. versionadded:: 1.1.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> inputFormatClass : str</span> |
| <span class="sd"> fully qualified classname of Hadoop InputFormat</span> |
| <span class="sd"> (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")</span> |
| <span class="sd"> keyClass : str</span> |
| <span class="sd"> fully qualified classname of key Writable class (e.g. "org.apache.hadoop.io.Text")</span> |
| <span class="sd"> valueClass : str</span> |
| <span class="sd"> fully qualified classname of value Writable class</span> |
| <span class="sd"> (e.g. "org.apache.hadoop.io.LongWritable")</span> |
| <span class="sd"> keyConverter : str, optional</span> |
| <span class="sd"> fully qualified name of a function returning key WritableConverter</span> |
| <span class="sd"> (None by default)</span> |
| <span class="sd"> valueConverter : str, optional</span> |
| <span class="sd"> fully qualified name of a function returning value WritableConverter</span> |
| <span class="sd"> (None by default)</span> |
| <span class="sd"> conf : dict, optional</span> |
| <span class="sd"> Hadoop configuration, passed in as a dict (None by default)</span> |
| <span class="sd"> batchSize : int, optional, default 0</span> |
| <span class="sd"> The number of Python objects represented as a single</span> |
| <span class="sd"> Java object. (default 0, choose batchSize automatically)</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`RDD`</span> |
| <span class="sd"> RDD of tuples of key and corresponding value</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`RDD.saveAsNewAPIHadoopDataset`</span> |
| <span class="sd"> :meth:`RDD.saveAsHadoopDataset`</span> |
| <span class="sd"> :meth:`SparkContext.hadoopRDD`</span> |
| <span class="sd"> :meth:`SparkContext.hadoopFile`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import os</span> |
| <span class="sd"> >>> import tempfile</span> |
| |
| <span class="sd"> Set the related classes</span> |
| |
| <span class="sd"> >>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"</span> |
| <span class="sd"> >>> input_format_class = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"</span> |
| <span class="sd"> >>> key_class = "org.apache.hadoop.io.IntWritable"</span> |
| <span class="sd"> >>> value_class = "org.apache.hadoop.io.Text"</span> |
| |
| <span class="sd"> >>> with tempfile.TemporaryDirectory() as d:</span> |
| <span class="sd"> ... path = os.path.join(d, "new_hadoop_file")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Create the conf for writing</span> |
| <span class="sd"> ... write_conf = {</span> |
| <span class="sd"> ... "mapreduce.job.outputformat.class": (output_format_class),</span> |
| <span class="sd"> ... "mapreduce.job.output.key.class": key_class,</span> |
| <span class="sd"> ... "mapreduce.job.output.value.class": value_class,</span> |
| <span class="sd"> ... "mapreduce.output.fileoutputformat.outputdir": path,</span> |
| <span class="sd"> ... }</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Write a temporary Hadoop file</span> |
| <span class="sd"> ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])</span> |
| <span class="sd"> ... rdd.saveAsNewAPIHadoopDataset(conf=write_conf)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Create the conf for reading</span> |
| <span class="sd"> ... read_conf = {"mapreduce.input.fileinputformat.inputdir": path}</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... loaded = sc.newAPIHadoopRDD(input_format_class,</span> |
| <span class="sd"> ... key_class, value_class, conf=read_conf)</span> |
| <span class="sd"> ... collected = sorted(loaded.collect())</span> |
| |
| <span class="sd"> >>> collected</span> |
| <span class="sd"> [(1, ''), (1, 'a'), (3, 'x')]</span> |
| <span class="sd"> """</span> |
| <span class="n">jconf</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dictToJavaMap</span><span class="p">(</span><span class="n">conf</span><span class="p">)</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">jrdd</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonRDD</span><span class="o">.</span><span class="n">newAPIHadoopRDD</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="p">,</span> |
| <span class="n">inputFormatClass</span><span class="p">,</span> |
| <span class="n">keyClass</span><span class="p">,</span> |
| <span class="n">valueClass</span><span class="p">,</span> |
| <span class="n">keyConverter</span><span class="p">,</span> |
| <span class="n">valueConverter</span><span class="p">,</span> |
| <span class="n">jconf</span><span class="p">,</span> |
| <span class="n">batchSize</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span><span class="n">jrdd</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.hadoopFile"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.hadoopFile.html#pyspark.SparkContext.hadoopFile">[docs]</a> <span class="k">def</span> <span class="nf">hadoopFile</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">inputFormatClass</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">keyClass</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">valueClass</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">keyConverter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">valueConverter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">conf</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">batchSize</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">0</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">T</span><span class="p">,</span> <span class="n">U</span><span class="p">]]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,</span> |
| <span class="sd"> a local file system (available on all nodes), or any Hadoop-supported file system URI.</span> |
| <span class="sd"> The mechanism is the same as for meth:`SparkContext.sequenceFile`.</span> |
| |
| <span class="sd"> .. versionadded:: 1.1.0</span> |
| |
| <span class="sd"> A Hadoop configuration can be passed in as a Python dict. This will be converted into a</span> |
| <span class="sd"> Configuration in Java.</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str</span> |
| <span class="sd"> path to Hadoop file</span> |
| <span class="sd"> inputFormatClass : str</span> |
| <span class="sd"> fully qualified classname of Hadoop InputFormat</span> |
| <span class="sd"> (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")</span> |
| <span class="sd"> keyClass : str</span> |
| <span class="sd"> fully qualified classname of key Writable class (e.g. "org.apache.hadoop.io.Text")</span> |
| <span class="sd"> valueClass : str</span> |
| <span class="sd"> fully qualified classname of value Writable class</span> |
| <span class="sd"> (e.g. "org.apache.hadoop.io.LongWritable")</span> |
| <span class="sd"> keyConverter : str, optional</span> |
| <span class="sd"> fully qualified name of a function returning key WritableConverter</span> |
| <span class="sd"> valueConverter : str, optional</span> |
| <span class="sd"> fully qualified name of a function returning value WritableConverter</span> |
| <span class="sd"> conf : dict, optional</span> |
| <span class="sd"> Hadoop configuration, passed in as a dict</span> |
| <span class="sd"> batchSize : int, optional, default 0</span> |
| <span class="sd"> The number of Python objects represented as a single</span> |
| <span class="sd"> Java object. (default 0, choose batchSize automatically)</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`RDD`</span> |
| <span class="sd"> RDD of tuples of key and corresponding value</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`RDD.saveAsSequenceFile`</span> |
| <span class="sd"> :meth:`RDD.saveAsNewAPIHadoopFile`</span> |
| <span class="sd"> :meth:`RDD.saveAsHadoopFile`</span> |
| <span class="sd"> :meth:`SparkContext.newAPIHadoopFile`</span> |
| <span class="sd"> :meth:`SparkContext.hadoopRDD`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import os</span> |
| <span class="sd"> >>> import tempfile</span> |
| |
| <span class="sd"> Set the related classes</span> |
| |
| <span class="sd"> >>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat"</span> |
| <span class="sd"> >>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat"</span> |
| <span class="sd"> >>> key_class = "org.apache.hadoop.io.IntWritable"</span> |
| <span class="sd"> >>> value_class = "org.apache.hadoop.io.Text"</span> |
| |
| <span class="sd"> >>> with tempfile.TemporaryDirectory() as d:</span> |
| <span class="sd"> ... path = os.path.join(d, "old_hadoop_file")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Write a temporary Hadoop file</span> |
| <span class="sd"> ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])</span> |
| <span class="sd"> ... rdd.saveAsHadoopFile(path, output_format_class, key_class, value_class)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... loaded = sc.hadoopFile(path, input_format_class, key_class, value_class)</span> |
| <span class="sd"> ... collected = sorted(loaded.collect())</span> |
| |
| <span class="sd"> >>> collected</span> |
| <span class="sd"> [(0, '1\\t'), (0, '1\\ta'), (0, '3\\tx')]</span> |
| <span class="sd"> """</span> |
| <span class="n">jconf</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dictToJavaMap</span><span class="p">(</span><span class="n">conf</span><span class="p">)</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">jrdd</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonRDD</span><span class="o">.</span><span class="n">hadoopFile</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">,</span> |
| <span class="n">inputFormatClass</span><span class="p">,</span> |
| <span class="n">keyClass</span><span class="p">,</span> |
| <span class="n">valueClass</span><span class="p">,</span> |
| <span class="n">keyConverter</span><span class="p">,</span> |
| <span class="n">valueConverter</span><span class="p">,</span> |
| <span class="n">jconf</span><span class="p">,</span> |
| <span class="n">batchSize</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span><span class="n">jrdd</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.hadoopRDD"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.hadoopRDD.html#pyspark.SparkContext.hadoopRDD">[docs]</a> <span class="k">def</span> <span class="nf">hadoopRDD</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">inputFormatClass</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">keyClass</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">valueClass</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">keyConverter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">valueConverter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">conf</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">batchSize</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">0</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">T</span><span class="p">,</span> <span class="n">U</span><span class="p">]]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary</span> |
| <span class="sd"> Hadoop configuration, which is passed in as a Python dict.</span> |
| <span class="sd"> This will be converted into a Configuration in Java.</span> |
| <span class="sd"> The mechanism is the same as for meth:`SparkContext.sequenceFile`.</span> |
| |
| <span class="sd"> .. versionadded:: 1.1.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> inputFormatClass : str</span> |
| <span class="sd"> fully qualified classname of Hadoop InputFormat</span> |
| <span class="sd"> (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")</span> |
| <span class="sd"> keyClass : str</span> |
| <span class="sd"> fully qualified classname of key Writable class (e.g. "org.apache.hadoop.io.Text")</span> |
| <span class="sd"> valueClass : str</span> |
| <span class="sd"> fully qualified classname of value Writable class</span> |
| <span class="sd"> (e.g. "org.apache.hadoop.io.LongWritable")</span> |
| <span class="sd"> keyConverter : str, optional</span> |
| <span class="sd"> fully qualified name of a function returning key WritableConverter</span> |
| <span class="sd"> valueConverter : str, optional</span> |
| <span class="sd"> fully qualified name of a function returning value WritableConverter</span> |
| <span class="sd"> conf : dict, optional</span> |
| <span class="sd"> Hadoop configuration, passed in as a dict</span> |
| <span class="sd"> batchSize : int, optional, default 0</span> |
| <span class="sd"> The number of Python objects represented as a single</span> |
| <span class="sd"> Java object. (default 0, choose batchSize automatically)</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`RDD`</span> |
| <span class="sd"> RDD of tuples of key and corresponding value</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`RDD.saveAsNewAPIHadoopDataset`</span> |
| <span class="sd"> :meth:`RDD.saveAsHadoopDataset`</span> |
| <span class="sd"> :meth:`SparkContext.newAPIHadoopRDD`</span> |
| <span class="sd"> :meth:`SparkContext.hadoopFile`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import os</span> |
| <span class="sd"> >>> import tempfile</span> |
| |
| <span class="sd"> Set the related classes</span> |
| |
| <span class="sd"> >>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat"</span> |
| <span class="sd"> >>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat"</span> |
| <span class="sd"> >>> key_class = "org.apache.hadoop.io.IntWritable"</span> |
| <span class="sd"> >>> value_class = "org.apache.hadoop.io.Text"</span> |
| |
| <span class="sd"> >>> with tempfile.TemporaryDirectory() as d:</span> |
| <span class="sd"> ... path = os.path.join(d, "old_hadoop_file")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Create the conf for writing</span> |
| <span class="sd"> ... write_conf = {</span> |
| <span class="sd"> ... "mapred.output.format.class": output_format_class,</span> |
| <span class="sd"> ... "mapreduce.job.output.key.class": key_class,</span> |
| <span class="sd"> ... "mapreduce.job.output.value.class": value_class,</span> |
| <span class="sd"> ... "mapreduce.output.fileoutputformat.outputdir": path,</span> |
| <span class="sd"> ... }</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Write a temporary Hadoop file</span> |
| <span class="sd"> ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])</span> |
| <span class="sd"> ... rdd.saveAsHadoopDataset(conf=write_conf)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # Create the conf for reading</span> |
| <span class="sd"> ... read_conf = {"mapreduce.input.fileinputformat.inputdir": path}</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... loaded = sc.hadoopRDD(input_format_class, key_class, value_class, conf=read_conf)</span> |
| <span class="sd"> ... collected = sorted(loaded.collect())</span> |
| |
| <span class="sd"> >>> collected</span> |
| <span class="sd"> [(0, '1\\t'), (0, '1\\ta'), (0, '3\\tx')]</span> |
| <span class="sd"> """</span> |
| <span class="n">jconf</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dictToJavaMap</span><span class="p">(</span><span class="n">conf</span><span class="p">)</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">jrdd</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonRDD</span><span class="o">.</span><span class="n">hadoopRDD</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="p">,</span> |
| <span class="n">inputFormatClass</span><span class="p">,</span> |
| <span class="n">keyClass</span><span class="p">,</span> |
| <span class="n">valueClass</span><span class="p">,</span> |
| <span class="n">keyConverter</span><span class="p">,</span> |
| <span class="n">valueConverter</span><span class="p">,</span> |
| <span class="n">jconf</span><span class="p">,</span> |
| <span class="n">batchSize</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span><span class="n">jrdd</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span></div> |
| |
| <span class="k">def</span> <span class="nf">_checkpointFile</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">input_deserializer</span><span class="p">:</span> <span class="n">PairDeserializer</span><span class="p">)</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">:</span> |
| <span class="n">jrdd</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">checkpointFile</span><span class="p">(</span><span class="n">name</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span><span class="n">jrdd</span><span class="p">,</span> <span class="bp">self</span><span class="p">,</span> <span class="n">input_deserializer</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="SparkContext.union"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.union.html#pyspark.SparkContext.union">[docs]</a> <span class="k">def</span> <span class="nf">union</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">rdds</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]])</span> <span class="o">-></span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Build the union of a list of RDDs.</span> |
| |
| <span class="sd"> This supports unions() of RDDs with different serialized formats,</span> |
| <span class="sd"> although this forces them to be reserialized using the default</span> |
| <span class="sd"> serializer:</span> |
| |
| <span class="sd"> .. versionadded:: 0.7.0</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`RDD.union`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import os</span> |
| <span class="sd"> >>> import tempfile</span> |
| <span class="sd"> >>> with tempfile.TemporaryDirectory() as d:</span> |
| <span class="sd"> ... # generate a text RDD</span> |
| <span class="sd"> ... with open(os.path.join(d, "union-text.txt"), "w") as f:</span> |
| <span class="sd"> ... _ = f.write("Hello")</span> |
| <span class="sd"> ... text_rdd = sc.textFile(d)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # generate another RDD</span> |
| <span class="sd"> ... parallelized = sc.parallelize(["World!"])</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... unioned = sorted(sc.union([text_rdd, parallelized]).collect())</span> |
| |
| <span class="sd"> >>> unioned</span> |
| <span class="sd"> ['Hello', 'World!']</span> |
| <span class="sd"> """</span> |
| <span class="n">first_jrdd_deserializer</span> <span class="o">=</span> <span class="n">rdds</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">_jrdd_deserializer</span> |
| <span class="k">if</span> <span class="nb">any</span><span class="p">(</span><span class="n">x</span><span class="o">.</span><span class="n">_jrdd_deserializer</span> <span class="o">!=</span> <span class="n">first_jrdd_deserializer</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">rdds</span><span class="p">):</span> |
| <span class="n">rdds</span> <span class="o">=</span> <span class="p">[</span><span class="n">x</span><span class="o">.</span><span class="n">_reserialize</span><span class="p">()</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">rdds</span><span class="p">]</span> |
| <span class="n">gw</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_gateway</span> |
| <span class="k">assert</span> <span class="n">gw</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">jvm</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_jvm</span> |
| <span class="k">assert</span> <span class="n">jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">jrdd_cls</span> <span class="o">=</span> <span class="n">jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">api</span><span class="o">.</span><span class="n">java</span><span class="o">.</span><span class="n">JavaRDD</span> |
| <span class="n">jpair_rdd_cls</span> <span class="o">=</span> <span class="n">jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">api</span><span class="o">.</span><span class="n">java</span><span class="o">.</span><span class="n">JavaPairRDD</span> |
| <span class="n">jdouble_rdd_cls</span> <span class="o">=</span> <span class="n">jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">api</span><span class="o">.</span><span class="n">java</span><span class="o">.</span><span class="n">JavaDoubleRDD</span> |
| <span class="k">if</span> <span class="n">is_instance_of</span><span class="p">(</span><span class="n">gw</span><span class="p">,</span> <span class="n">rdds</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">_jrdd</span><span class="p">,</span> <span class="n">jrdd_cls</span><span class="p">):</span> |
| <span class="bp">cls</span> <span class="o">=</span> <span class="n">jrdd_cls</span> |
| <span class="k">elif</span> <span class="n">is_instance_of</span><span class="p">(</span><span class="n">gw</span><span class="p">,</span> <span class="n">rdds</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">_jrdd</span><span class="p">,</span> <span class="n">jpair_rdd_cls</span><span class="p">):</span> |
| <span class="bp">cls</span> <span class="o">=</span> <span class="n">jpair_rdd_cls</span> |
| <span class="k">elif</span> <span class="n">is_instance_of</span><span class="p">(</span><span class="n">gw</span><span class="p">,</span> <span class="n">rdds</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">_jrdd</span><span class="p">,</span> <span class="n">jdouble_rdd_cls</span><span class="p">):</span> |
| <span class="bp">cls</span> <span class="o">=</span> <span class="n">jdouble_rdd_cls</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">cls_name</span> <span class="o">=</span> <span class="n">rdds</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">_jrdd</span><span class="o">.</span><span class="n">getClass</span><span class="p">()</span><span class="o">.</span><span class="n">getCanonicalName</span><span class="p">()</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">"Unsupported Java RDD class </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">cls_name</span><span class="p">)</span> |
| <span class="n">jrdds</span> <span class="o">=</span> <span class="n">gw</span><span class="o">.</span><span class="n">new_array</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">rdds</span><span class="p">))</span> |
| <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">rdds</span><span class="p">)):</span> |
| <span class="n">jrdds</span><span class="p">[</span><span class="n">i</span><span class="p">]</span> <span class="o">=</span> <span class="n">rdds</span><span class="p">[</span><span class="n">i</span><span class="p">]</span><span class="o">.</span><span class="n">_jrdd</span> |
| <span class="k">return</span> <span class="n">RDD</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">union</span><span class="p">(</span><span class="n">jrdds</span><span class="p">),</span> <span class="bp">self</span><span class="p">,</span> <span class="n">rdds</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.broadcast"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.broadcast.html#pyspark.SparkContext.broadcast">[docs]</a> <span class="k">def</span> <span class="nf">broadcast</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="n">T</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"Broadcast[T]"</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Broadcast a read-only variable to the cluster, returning a :class:`Broadcast`</span> |
| <span class="sd"> object for reading it in distributed functions. The variable will</span> |
| <span class="sd"> be sent to each cluster only once.</span> |
| |
| <span class="sd"> .. versionadded:: 0.7.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> value : T</span> |
| <span class="sd"> value to broadcast to the Spark nodes</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`Broadcast`</span> |
| <span class="sd"> :class:`Broadcast` object, a read-only variable cached on each machine</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> mapping = {1: 10001, 2: 10002}</span> |
| <span class="sd"> >>> bc = sc.broadcast(mapping)</span> |
| |
| <span class="sd"> >>> rdd = sc.range(5)</span> |
| <span class="sd"> >>> rdd2 = rdd.map(lambda i: bc.value[i] if i in bc.value else -1)</span> |
| <span class="sd"> >>> rdd2.collect()</span> |
| <span class="sd"> [-1, 10001, 10002, -1, -1]</span> |
| |
| <span class="sd"> >>> bc.destroy()</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="n">Broadcast</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pickled_broadcast_vars</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.accumulator"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.accumulator.html#pyspark.SparkContext.accumulator">[docs]</a> <span class="k">def</span> <span class="nf">accumulator</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="n">T</span><span class="p">,</span> <span class="n">accum_param</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="s2">"AccumulatorParam[T]"</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="s2">"Accumulator[T]"</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Create an :class:`Accumulator` with the given initial value, using a given</span> |
| <span class="sd"> :class:`AccumulatorParam` helper object to define how to add values of the</span> |
| <span class="sd"> data type if provided. Default AccumulatorParams are used for integers</span> |
| <span class="sd"> and floating-point numbers if you do not provide one. For other types,</span> |
| <span class="sd"> a custom AccumulatorParam can be used.</span> |
| |
| <span class="sd"> .. versionadded:: 0.7.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> value : T</span> |
| <span class="sd"> initialized value</span> |
| <span class="sd"> accum_param : :class:`pyspark.AccumulatorParam`, optional</span> |
| <span class="sd"> helper object to define how to add values</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`Accumulator`</span> |
| <span class="sd"> `Accumulator` object, a shared variable that can be accumulated</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> acc = sc.accumulator(9)</span> |
| <span class="sd"> >>> acc.value</span> |
| <span class="sd"> 9</span> |
| <span class="sd"> >>> acc += 1</span> |
| <span class="sd"> >>> acc.value</span> |
| <span class="sd"> 10</span> |
| |
| <span class="sd"> Accumulator object can be accumulated in RDD operations:</span> |
| |
| <span class="sd"> >>> rdd = sc.range(5)</span> |
| <span class="sd"> >>> def f(x):</span> |
| <span class="sd"> ... global acc</span> |
| <span class="sd"> ... acc += 1</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> >>> rdd.foreach(f)</span> |
| <span class="sd"> >>> acc.value</span> |
| <span class="sd"> 15</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">accum_param</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span> |
| <span class="n">accum_param</span> <span class="o">=</span> <span class="n">cast</span><span class="p">(</span><span class="s2">"AccumulatorParam[T]"</span><span class="p">,</span> <span class="n">accumulators</span><span class="o">.</span><span class="n">INT_ACCUMULATOR_PARAM</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="nb">float</span><span class="p">):</span> |
| <span class="n">accum_param</span> <span class="o">=</span> <span class="n">cast</span><span class="p">(</span><span class="s2">"AccumulatorParam[T]"</span><span class="p">,</span> <span class="n">accumulators</span><span class="o">.</span><span class="n">FLOAT_ACCUMULATOR_PARAM</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="nb">complex</span><span class="p">):</span> |
| <span class="n">accum_param</span> <span class="o">=</span> <span class="n">cast</span><span class="p">(</span><span class="s2">"AccumulatorParam[T]"</span><span class="p">,</span> <span class="n">accumulators</span><span class="o">.</span><span class="n">COMPLEX_ACCUMULATOR_PARAM</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">"No default accumulator param for type </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="nb">type</span><span class="p">(</span><span class="n">value</span><span class="p">))</span> |
| <span class="n">SparkContext</span><span class="o">.</span><span class="n">_next_accum_id</span> <span class="o">+=</span> <span class="mi">1</span> |
| <span class="k">return</span> <span class="n">Accumulator</span><span class="p">(</span><span class="n">SparkContext</span><span class="o">.</span><span class="n">_next_accum_id</span> <span class="o">-</span> <span class="mi">1</span><span class="p">,</span> <span class="n">value</span><span class="p">,</span> <span class="n">accum_param</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.addFile"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.addFile.html#pyspark.SparkContext.addFile">[docs]</a> <span class="k">def</span> <span class="nf">addFile</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">recursive</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Add a file to be downloaded with this Spark job on every node.</span> |
| <span class="sd"> The `path` passed can be either a local file, a file in HDFS</span> |
| <span class="sd"> (or other Hadoop-supported filesystems), or an HTTP, HTTPS or</span> |
| <span class="sd"> FTP URI.</span> |
| |
| <span class="sd"> To access the file in Spark jobs, use :meth:`SparkFiles.get` with the</span> |
| <span class="sd"> filename to find its download location.</span> |
| |
| <span class="sd"> A directory can be given if the recursive option is set to True.</span> |
| <span class="sd"> Currently directories are only supported for Hadoop-supported filesystems.</span> |
| |
| <span class="sd"> .. versionadded:: 0.7.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str</span> |
| <span class="sd"> can be either a local file, a file in HDFS (or other Hadoop-supported</span> |
| <span class="sd"> filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,</span> |
| <span class="sd"> use :meth:`SparkFiles.get` to find its download location.</span> |
| <span class="sd"> recursive : bool, default False</span> |
| <span class="sd"> whether to recursively add files in the input directory</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.listFiles`</span> |
| <span class="sd"> :meth:`SparkContext.addPyFile`</span> |
| <span class="sd"> :meth:`SparkFiles.get`</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> A path can be added only once. Subsequent additions of the same path are ignored.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import os</span> |
| <span class="sd"> >>> import tempfile</span> |
| <span class="sd"> >>> from pyspark import SparkFiles</span> |
| |
| <span class="sd"> >>> with tempfile.TemporaryDirectory() as d:</span> |
| <span class="sd"> ... path1 = os.path.join(d, "test1.txt")</span> |
| <span class="sd"> ... with open(path1, "w") as f:</span> |
| <span class="sd"> ... _ = f.write("100")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... path2 = os.path.join(d, "test2.txt")</span> |
| <span class="sd"> ... with open(path2, "w") as f:</span> |
| <span class="sd"> ... _ = f.write("200")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... sc.addFile(path1)</span> |
| <span class="sd"> ... file_list1 = sorted(sc.listFiles)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... sc.addFile(path2)</span> |
| <span class="sd"> ... file_list2 = sorted(sc.listFiles)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # add path2 twice, this addition will be ignored</span> |
| <span class="sd"> ... sc.addFile(path2)</span> |
| <span class="sd"> ... file_list3 = sorted(sc.listFiles)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... def func(iterator):</span> |
| <span class="sd"> ... with open(SparkFiles.get("test1.txt")) as f:</span> |
| <span class="sd"> ... mul = int(f.readline())</span> |
| <span class="sd"> ... return [x * mul for x in iterator]</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... collected = sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()</span> |
| |
| <span class="sd"> >>> file_list1</span> |
| <span class="sd"> ['file:/.../test1.txt']</span> |
| <span class="sd"> >>> file_list2</span> |
| <span class="sd"> ['file:/.../test1.txt', 'file:/.../test2.txt']</span> |
| <span class="sd"> >>> file_list3</span> |
| <span class="sd"> ['file:/.../test1.txt', 'file:/.../test2.txt']</span> |
| <span class="sd"> >>> collected</span> |
| <span class="sd"> [100, 200, 300, 400]</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">addFile</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="n">recursive</span><span class="p">)</span></div> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">listFiles</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""Returns a list of file paths that are added to resources.</span> |
| |
| <span class="sd"> .. versionadded:: 3.4.0</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.addFile`</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="nb">list</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">scala</span><span class="o">.</span><span class="n">collection</span><span class="o">.</span><span class="n">JavaConverters</span><span class="o">.</span><span class="n">seqAsJavaList</span><span class="p">(</span> <span class="c1"># type: ignore[union-attr]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">listFiles</span><span class="p">()</span> |
| <span class="p">)</span> |
| <span class="p">)</span> |
| |
| <div class="viewcode-block" id="SparkContext.addPyFile"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.addPyFile.html#pyspark.SparkContext.addPyFile">[docs]</a> <span class="k">def</span> <span class="nf">addPyFile</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Add a .py or .zip dependency for all tasks to be executed on this</span> |
| <span class="sd"> SparkContext in the future. The `path` passed can be either a local</span> |
| <span class="sd"> file, a file in HDFS (or other Hadoop-supported filesystems), or an</span> |
| <span class="sd"> HTTP, HTTPS or FTP URI.</span> |
| |
| <span class="sd"> .. versionadded:: 0.7.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str</span> |
| <span class="sd"> can be either a .py file or .zip dependency.</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.addFile`</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> A path can be added only once. Subsequent additions of the same path are ignored.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">addFile</span><span class="p">(</span><span class="n">path</span><span class="p">)</span> |
| <span class="p">(</span><span class="n">dirname</span><span class="p">,</span> <span class="n">filename</span><span class="p">)</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="n">path</span><span class="p">)</span> <span class="c1"># dirname may be directory or HDFS/S3 prefix</span> |
| <span class="k">if</span> <span class="n">filename</span><span class="p">[</span><span class="o">-</span><span class="mi">4</span><span class="p">:]</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">PACKAGE_EXTENSIONS</span><span class="p">:</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_python_includes</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_python_includes</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">filename</span><span class="p">)</span> |
| <span class="c1"># for tests in local mode</span> |
| <span class="n">sys</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">insert</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">SparkFiles</span><span class="o">.</span><span class="n">getRootDirectory</span><span class="p">(),</span> <span class="n">filename</span><span class="p">))</span> |
| |
| <span class="n">importlib</span><span class="o">.</span><span class="n">invalidate_caches</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.addArchive"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.addArchive.html#pyspark.SparkContext.addArchive">[docs]</a> <span class="k">def</span> <span class="nf">addArchive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Add an archive to be downloaded with this Spark job on every node.</span> |
| <span class="sd"> The `path` passed can be either a local file, a file in HDFS</span> |
| <span class="sd"> (or other Hadoop-supported filesystems), or an HTTP, HTTPS or</span> |
| <span class="sd"> FTP URI.</span> |
| |
| <span class="sd"> To access the file in Spark jobs, use :meth:`SparkFiles.get` with the</span> |
| <span class="sd"> filename to find its download/unpacked location. The given path should</span> |
| <span class="sd"> be one of .zip, .tar, .tar.gz, .tgz and .jar.</span> |
| |
| <span class="sd"> .. versionadded:: 3.3.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str</span> |
| <span class="sd"> can be either a local file, a file in HDFS (or other Hadoop-supported</span> |
| <span class="sd"> filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,</span> |
| <span class="sd"> use :meth:`SparkFiles.get` to find its download location.</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.listArchives`</span> |
| <span class="sd"> :meth:`SparkFiles.get`</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> A path can be added only once. Subsequent additions of the same path are ignored.</span> |
| <span class="sd"> This API is experimental.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> Creates a zipped file that contains a text file written '100'.</span> |
| |
| <span class="sd"> >>> import os</span> |
| <span class="sd"> >>> import tempfile</span> |
| <span class="sd"> >>> import zipfile</span> |
| <span class="sd"> >>> from pyspark import SparkFiles</span> |
| |
| <span class="sd"> >>> with tempfile.TemporaryDirectory() as d:</span> |
| <span class="sd"> ... path = os.path.join(d, "test.txt")</span> |
| <span class="sd"> ... with open(path, "w") as f:</span> |
| <span class="sd"> ... _ = f.write("100")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... zip_path1 = os.path.join(d, "test1.zip")</span> |
| <span class="sd"> ... with zipfile.ZipFile(zip_path1, "w", zipfile.ZIP_DEFLATED) as z:</span> |
| <span class="sd"> ... z.write(path, os.path.basename(path))</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... zip_path2 = os.path.join(d, "test2.zip")</span> |
| <span class="sd"> ... with zipfile.ZipFile(zip_path2, "w", zipfile.ZIP_DEFLATED) as z:</span> |
| <span class="sd"> ... z.write(path, os.path.basename(path))</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... sc.addArchive(zip_path1)</span> |
| <span class="sd"> ... arch_list1 = sorted(sc.listArchives)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... sc.addArchive(zip_path2)</span> |
| <span class="sd"> ... arch_list2 = sorted(sc.listArchives)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... # add zip_path2 twice, this addition will be ignored</span> |
| <span class="sd"> ... sc.addArchive(zip_path2)</span> |
| <span class="sd"> ... arch_list3 = sorted(sc.listArchives)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... def func(iterator):</span> |
| <span class="sd"> ... with open("%s/test.txt" % SparkFiles.get("test1.zip")) as f:</span> |
| <span class="sd"> ... mul = int(f.readline())</span> |
| <span class="sd"> ... return [x * mul for x in iterator]</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> ... collected = sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()</span> |
| |
| <span class="sd"> >>> arch_list1</span> |
| <span class="sd"> ['file:/.../test1.zip']</span> |
| <span class="sd"> >>> arch_list2</span> |
| <span class="sd"> ['file:/.../test1.zip', 'file:/.../test2.zip']</span> |
| <span class="sd"> >>> arch_list3</span> |
| <span class="sd"> ['file:/.../test1.zip', 'file:/.../test2.zip']</span> |
| <span class="sd"> >>> collected</span> |
| <span class="sd"> [100, 200, 300, 400]</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">addArchive</span><span class="p">(</span><span class="n">path</span><span class="p">)</span></div> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">listArchives</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""Returns a list of archive paths that are added to resources.</span> |
| |
| <span class="sd"> .. versionadded:: 3.4.0</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.addArchive`</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="nb">list</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">scala</span><span class="o">.</span><span class="n">collection</span><span class="o">.</span><span class="n">JavaConverters</span><span class="o">.</span><span class="n">seqAsJavaList</span><span class="p">(</span> <span class="c1"># type: ignore[union-attr]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">listArchives</span><span class="p">()</span> |
| <span class="p">)</span> |
| <span class="p">)</span> |
| |
| <div class="viewcode-block" id="SparkContext.setCheckpointDir"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.setCheckpointDir.html#pyspark.SparkContext.setCheckpointDir">[docs]</a> <span class="k">def</span> <span class="nf">setCheckpointDir</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dirName</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Set the directory under which RDDs are going to be checkpointed. The</span> |
| <span class="sd"> directory must be an HDFS path if running on a cluster.</span> |
| |
| <span class="sd"> .. versionadded:: 0.7.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> dirName : str</span> |
| <span class="sd"> path to the directory where checkpoint files will be stored</span> |
| <span class="sd"> (must be HDFS path if running in cluster)</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.getCheckpointDir`</span> |
| <span class="sd"> :meth:`RDD.checkpoint`</span> |
| <span class="sd"> :meth:`RDD.getCheckpointFile`</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">setCheckpointDir</span><span class="p">(</span><span class="n">dirName</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.getCheckpointDir"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.getCheckpointDir.html#pyspark.SparkContext.getCheckpointDir">[docs]</a> <span class="k">def</span> <span class="nf">getCheckpointDir</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Return the directory where RDDs are checkpointed. Returns None if no</span> |
| <span class="sd"> checkpoint directory has been set.</span> |
| |
| <span class="sd"> .. versionadded:: 3.1.0</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.setCheckpointDir`</span> |
| <span class="sd"> :meth:`RDD.checkpoint`</span> |
| <span class="sd"> :meth:`RDD.getCheckpointFile`</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">getCheckpointDir</span><span class="p">()</span><span class="o">.</span><span class="n">isEmpty</span><span class="p">():</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">getCheckpointDir</span><span class="p">()</span><span class="o">.</span><span class="n">get</span><span class="p">()</span> |
| <span class="k">return</span> <span class="kc">None</span></div> |
| |
| <span class="k">def</span> <span class="nf">_getJavaStorageLevel</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">storageLevel</span><span class="p">:</span> <span class="n">StorageLevel</span><span class="p">)</span> <span class="o">-></span> <span class="n">JavaObject</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Returns a Java StorageLevel based on a pyspark.StorageLevel.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">storageLevel</span><span class="p">,</span> <span class="n">StorageLevel</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">"storageLevel must be of type pyspark.StorageLevel"</span><span class="p">)</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">newStorageLevel</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">storage</span><span class="o">.</span><span class="n">StorageLevel</span> |
| <span class="k">return</span> <span class="n">newStorageLevel</span><span class="p">(</span> |
| <span class="n">storageLevel</span><span class="o">.</span><span class="n">useDisk</span><span class="p">,</span> |
| <span class="n">storageLevel</span><span class="o">.</span><span class="n">useMemory</span><span class="p">,</span> |
| <span class="n">storageLevel</span><span class="o">.</span><span class="n">useOffHeap</span><span class="p">,</span> |
| <span class="n">storageLevel</span><span class="o">.</span><span class="n">deserialized</span><span class="p">,</span> |
| <span class="n">storageLevel</span><span class="o">.</span><span class="n">replication</span><span class="p">,</span> |
| <span class="p">)</span> |
| |
| <div class="viewcode-block" id="SparkContext.setJobGroup"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.setJobGroup.html#pyspark.SparkContext.setJobGroup">[docs]</a> <span class="k">def</span> <span class="nf">setJobGroup</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">groupId</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">description</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">interruptOnCancel</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Assigns a group ID to all the jobs started by this thread until the group ID is set to a</span> |
| <span class="sd"> different value or cleared.</span> |
| |
| <span class="sd"> Often, a unit of execution in an application consists of multiple Spark actions or jobs.</span> |
| <span class="sd"> Application programmers can use this method to group all those jobs together and give a</span> |
| <span class="sd"> group description. Once set, the Spark web UI will associate such jobs with this group.</span> |
| |
| <span class="sd"> The application can use :meth:`SparkContext.cancelJobGroup` to cancel all</span> |
| <span class="sd"> running jobs in this group.</span> |
| |
| <span class="sd"> .. versionadded:: 1.0.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> groupId : str</span> |
| <span class="sd"> The group ID to assign.</span> |
| <span class="sd"> description : str</span> |
| <span class="sd"> The description to set for the job group.</span> |
| <span class="sd"> interruptOnCancel : bool, optional, default False</span> |
| <span class="sd"> whether to interrupt jobs on job cancellation.</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> If interruptOnCancel is set to true for the job group, then job cancellation will result</span> |
| <span class="sd"> in Thread.interrupt() being called on the job's executor threads. This is useful to help</span> |
| <span class="sd"> ensure that the tasks are actually stopped in a timely manner, but is off by default due</span> |
| <span class="sd"> to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.</span> |
| |
| <span class="sd"> If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread</span> |
| <span class="sd"> local inheritance.</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.cancelJobGroup`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import threading</span> |
| <span class="sd"> >>> from time import sleep</span> |
| <span class="sd"> >>> from pyspark import InheritableThread</span> |
| <span class="sd"> >>> result = "Not Set"</span> |
| <span class="sd"> >>> lock = threading.Lock()</span> |
| <span class="sd"> >>> def map_func(x):</span> |
| <span class="sd"> ... sleep(100)</span> |
| <span class="sd"> ... raise RuntimeError("Task should have been cancelled")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> >>> def start_job(x):</span> |
| <span class="sd"> ... global result</span> |
| <span class="sd"> ... try:</span> |
| <span class="sd"> ... sc.setJobGroup("job_to_cancel", "some description")</span> |
| <span class="sd"> ... result = sc.parallelize(range(x)).map(map_func).collect()</span> |
| <span class="sd"> ... except Exception as e:</span> |
| <span class="sd"> ... result = "Cancelled"</span> |
| <span class="sd"> ... lock.release()</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> >>> def stop_job():</span> |
| <span class="sd"> ... sleep(5)</span> |
| <span class="sd"> ... sc.cancelJobGroup("job_to_cancel")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> >>> suppress = lock.acquire()</span> |
| <span class="sd"> >>> suppress = InheritableThread(target=start_job, args=(10,)).start()</span> |
| <span class="sd"> >>> suppress = InheritableThread(target=stop_job).start()</span> |
| <span class="sd"> >>> suppress = lock.acquire()</span> |
| <span class="sd"> >>> print(result)</span> |
| <span class="sd"> Cancelled</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">setJobGroup</span><span class="p">(</span><span class="n">groupId</span><span class="p">,</span> <span class="n">description</span><span class="p">,</span> <span class="n">interruptOnCancel</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.setInterruptOnCancel"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.setInterruptOnCancel.html#pyspark.SparkContext.setInterruptOnCancel">[docs]</a> <span class="k">def</span> <span class="nf">setInterruptOnCancel</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">interruptOnCancel</span><span class="p">:</span> <span class="nb">bool</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Set the behavior of job cancellation from jobs started in this thread.</span> |
| |
| <span class="sd"> .. versionadded:: 3.5.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> interruptOnCancel : bool</span> |
| <span class="sd"> If true, then job cancellation will result in ``Thread.interrupt()``</span> |
| <span class="sd"> being called on the job's executor threads. This is useful to help ensure that</span> |
| <span class="sd"> the tasks are actually stopped in a timely manner, but is off by default due to</span> |
| <span class="sd"> HDFS-1208, where HDFS may respond to ``Thread.interrupt()`` by marking nodes as dead.</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.addJobTag`</span> |
| <span class="sd"> :meth:`SparkContext.removeJobTag`</span> |
| <span class="sd"> :meth:`SparkContext.cancelAllJobs`</span> |
| <span class="sd"> :meth:`SparkContext.cancelJobGroup`</span> |
| <span class="sd"> :meth:`SparkContext.cancelJobsWithTag`</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">setInterruptOnCancel</span><span class="p">(</span><span class="n">interruptOnCancel</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.addJobTag"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.addJobTag.html#pyspark.SparkContext.addJobTag">[docs]</a> <span class="k">def</span> <span class="nf">addJobTag</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tag</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Add a tag to be assigned to all the jobs started by this thread.</span> |
| |
| <span class="sd"> .. versionadded:: 3.5.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> tag : str</span> |
| <span class="sd"> The tag to be added. Cannot contain ',' (comma) character.</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.removeJobTag`</span> |
| <span class="sd"> :meth:`SparkContext.getJobTags`</span> |
| <span class="sd"> :meth:`SparkContext.clearJobTags`</span> |
| <span class="sd"> :meth:`SparkContext.cancelJobsWithTag`</span> |
| <span class="sd"> :meth:`SparkContext.setInterruptOnCancel`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> import threading</span> |
| <span class="sd"> >>> from time import sleep</span> |
| <span class="sd"> >>> from pyspark import InheritableThread</span> |
| <span class="sd"> >>> sc.setInterruptOnCancel(interruptOnCancel=True)</span> |
| <span class="sd"> >>> result = "Not Set"</span> |
| <span class="sd"> >>> lock = threading.Lock()</span> |
| <span class="sd"> >>> def map_func(x):</span> |
| <span class="sd"> ... sleep(100)</span> |
| <span class="sd"> ... raise RuntimeError("Task should have been cancelled")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> >>> def start_job(x):</span> |
| <span class="sd"> ... global result</span> |
| <span class="sd"> ... try:</span> |
| <span class="sd"> ... sc.addJobTag("job_to_cancel")</span> |
| <span class="sd"> ... result = sc.parallelize(range(x)).map(map_func).collect()</span> |
| <span class="sd"> ... except Exception as e:</span> |
| <span class="sd"> ... result = "Cancelled"</span> |
| <span class="sd"> ... lock.release()</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> >>> def stop_job():</span> |
| <span class="sd"> ... sleep(5)</span> |
| <span class="sd"> ... sc.cancelJobsWithTag("job_to_cancel")</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> >>> suppress = lock.acquire()</span> |
| <span class="sd"> >>> suppress = InheritableThread(target=start_job, args=(10,)).start()</span> |
| <span class="sd"> >>> suppress = InheritableThread(target=stop_job).start()</span> |
| <span class="sd"> >>> suppress = lock.acquire()</span> |
| <span class="sd"> >>> print(result)</span> |
| <span class="sd"> Cancelled</span> |
| <span class="sd"> >>> sc.clearJobTags()</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">addJobTag</span><span class="p">(</span><span class="n">tag</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.removeJobTag"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.removeJobTag.html#pyspark.SparkContext.removeJobTag">[docs]</a> <span class="k">def</span> <span class="nf">removeJobTag</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tag</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Remove a tag previously added to be assigned to all the jobs started by this thread.</span> |
| <span class="sd"> Noop if such a tag was not added earlier.</span> |
| |
| <span class="sd"> .. versionadded:: 3.5.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> tag : str</span> |
| <span class="sd"> The tag to be removed. Cannot contain ',' (comma) character.</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.addJobTag`</span> |
| <span class="sd"> :meth:`SparkContext.getJobTags`</span> |
| <span class="sd"> :meth:`SparkContext.clearJobTags`</span> |
| <span class="sd"> :meth:`SparkContext.cancelJobsWithTag`</span> |
| <span class="sd"> :meth:`SparkContext.setInterruptOnCancel`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sc.addJobTag("job_to_cancel1")</span> |
| <span class="sd"> >>> sc.addJobTag("job_to_cancel2")</span> |
| <span class="sd"> >>> sc.getJobTags()</span> |
| <span class="sd"> {'job_to_cancel1', 'job_to_cancel2'}</span> |
| <span class="sd"> >>> sc.removeJobTag("job_to_cancel1")</span> |
| <span class="sd"> >>> sc.getJobTags()</span> |
| <span class="sd"> {'job_to_cancel2'}</span> |
| <span class="sd"> >>> sc.clearJobTags()</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">removeJobTag</span><span class="p">(</span><span class="n">tag</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.getJobTags"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.getJobTags.html#pyspark.SparkContext.getJobTags">[docs]</a> <span class="k">def</span> <span class="nf">getJobTags</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">Set</span><span class="p">[</span><span class="nb">str</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Get the tags that are currently set to be assigned to all the jobs started by this thread.</span> |
| |
| <span class="sd"> .. versionadded:: 3.5.0</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> set of str</span> |
| <span class="sd"> the tags that are currently set to be assigned to all the jobs started by this thread.</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.addJobTag`</span> |
| <span class="sd"> :meth:`SparkContext.removeJobTag`</span> |
| <span class="sd"> :meth:`SparkContext.clearJobTags`</span> |
| <span class="sd"> :meth:`SparkContext.cancelJobsWithTag`</span> |
| <span class="sd"> :meth:`SparkContext.setInterruptOnCancel`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sc.addJobTag("job_to_cancel")</span> |
| <span class="sd"> >>> sc.getJobTags()</span> |
| <span class="sd"> {'job_to_cancel'}</span> |
| <span class="sd"> >>> sc.clearJobTags()</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">getJobTags</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.clearJobTags"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.clearJobTags.html#pyspark.SparkContext.clearJobTags">[docs]</a> <span class="k">def</span> <span class="nf">clearJobTags</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Clear the current thread's job tags.</span> |
| |
| <span class="sd"> .. versionadded:: 3.5.0</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.addJobTag`</span> |
| <span class="sd"> :meth:`SparkContext.removeJobTag`</span> |
| <span class="sd"> :meth:`SparkContext.getJobTags`</span> |
| <span class="sd"> :meth:`SparkContext.cancelJobsWithTag`</span> |
| <span class="sd"> :meth:`SparkContext.setInterruptOnCancel`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sc.addJobTag("job_to_cancel")</span> |
| <span class="sd"> >>> sc.clearJobTags()</span> |
| <span class="sd"> >>> sc.getJobTags()</span> |
| <span class="sd"> set()</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">clearJobTags</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.setLocalProperty"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.setLocalProperty.html#pyspark.SparkContext.setLocalProperty">[docs]</a> <span class="k">def</span> <span class="nf">setLocalProperty</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Set a local property that affects jobs submitted from this thread, such as the</span> |
| <span class="sd"> Spark fair scheduler pool.</span> |
| |
| <span class="sd"> .. versionadded:: 1.0.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> key : str</span> |
| <span class="sd"> The key of the local property to set.</span> |
| <span class="sd"> value : str</span> |
| <span class="sd"> The value of the local property to set.</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.getLocalProperty`</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread</span> |
| <span class="sd"> local inheritance.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">setLocalProperty</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.getLocalProperty"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.getLocalProperty.html#pyspark.SparkContext.getLocalProperty">[docs]</a> <span class="k">def</span> <span class="nf">getLocalProperty</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Get a local property set in this thread, or null if it is missing. See</span> |
| <span class="sd"> :meth:`setLocalProperty`.</span> |
| |
| <span class="sd"> .. versionadded:: 1.0.0</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.setLocalProperty`</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">getLocalProperty</span><span class="p">(</span><span class="n">key</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.setJobDescription"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.setJobDescription.html#pyspark.SparkContext.setJobDescription">[docs]</a> <span class="k">def</span> <span class="nf">setJobDescription</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Set a human readable description of the current job.</span> |
| |
| <span class="sd"> .. versionadded:: 2.3.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> value : str</span> |
| <span class="sd"> The job description to set.</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread</span> |
| <span class="sd"> local inheritance.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">setJobDescription</span><span class="p">(</span><span class="n">value</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.sparkUser"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.sparkUser.html#pyspark.SparkContext.sparkUser">[docs]</a> <span class="k">def</span> <span class="nf">sparkUser</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="nb">str</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Get SPARK_USER for user who is running SparkContext.</span> |
| |
| <span class="sd"> .. versionadded:: 1.0.0</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">sparkUser</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.cancelJobGroup"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.cancelJobGroup.html#pyspark.SparkContext.cancelJobGroup">[docs]</a> <span class="k">def</span> <span class="nf">cancelJobGroup</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">groupId</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Cancel active jobs for the specified group. See :meth:`SparkContext.setJobGroup`.</span> |
| <span class="sd"> for more information.</span> |
| |
| <span class="sd"> .. versionadded:: 1.1.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> groupId : str</span> |
| <span class="sd"> The group ID to cancel the job.</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.setJobGroup`</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">cancelJobGroup</span><span class="p">(</span><span class="n">groupId</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.cancelJobsWithTag"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.cancelJobsWithTag.html#pyspark.SparkContext.cancelJobsWithTag">[docs]</a> <span class="k">def</span> <span class="nf">cancelJobsWithTag</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tag</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Cancel active jobs that have the specified tag. See</span> |
| <span class="sd"> :meth:`SparkContext.addJobTag`.</span> |
| |
| <span class="sd"> .. versionadded:: 3.5.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> tag : str</span> |
| <span class="sd"> The tag to be cancelled. Cannot contain ',' (comma) character.</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.addJobTag`</span> |
| <span class="sd"> :meth:`SparkContext.removeJobTag`</span> |
| <span class="sd"> :meth:`SparkContext.getJobTags`</span> |
| <span class="sd"> :meth:`SparkContext.clearJobTags`</span> |
| <span class="sd"> :meth:`SparkContext.setInterruptOnCancel`</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">cancelJobsWithTag</span><span class="p">(</span><span class="n">tag</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.cancelAllJobs"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.cancelAllJobs.html#pyspark.SparkContext.cancelAllJobs">[docs]</a> <span class="k">def</span> <span class="nf">cancelAllJobs</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Cancel all jobs that have been scheduled or are running.</span> |
| |
| <span class="sd"> .. versionadded:: 1.1.0</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.cancelJobGroup`</span> |
| <span class="sd"> :meth:`SparkContext.cancelJobsWithTag`</span> |
| <span class="sd"> :meth:`SparkContext.runJob`</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">()</span><span class="o">.</span><span class="n">cancelAllJobs</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.statusTracker"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.statusTracker.html#pyspark.SparkContext.statusTracker">[docs]</a> <span class="k">def</span> <span class="nf">statusTracker</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">StatusTracker</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Return :class:`StatusTracker` object</span> |
| |
| <span class="sd"> .. versionadded:: 1.4.0</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="n">StatusTracker</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">statusTracker</span><span class="p">())</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.runJob"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.runJob.html#pyspark.SparkContext.runJob">[docs]</a> <span class="k">def</span> <span class="nf">runJob</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">rdd</span><span class="p">:</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">],</span> |
| <span class="n">partitionFunc</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">Iterable</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">U</span><span class="p">]],</span> |
| <span class="n">partitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Sequence</span><span class="p">[</span><span class="nb">int</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">allowLocal</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">List</span><span class="p">[</span><span class="n">U</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Executes the given partitionFunc on the specified set of partitions,</span> |
| <span class="sd"> returning the result as an array of elements.</span> |
| |
| <span class="sd"> If 'partitions' is not specified, this will run over all partitions.</span> |
| |
| <span class="sd"> .. versionadded:: 1.1.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> rdd : :class:`RDD`</span> |
| <span class="sd"> target RDD to run tasks on</span> |
| <span class="sd"> partitionFunc : function</span> |
| <span class="sd"> a function to run on each partition of the RDD</span> |
| <span class="sd"> partitions : list, optional</span> |
| <span class="sd"> set of partitions to run on; some jobs may not want to compute on all</span> |
| <span class="sd"> partitions of the target RDD, e.g. for operations like `first`</span> |
| <span class="sd"> allowLocal : bool, default False</span> |
| <span class="sd"> this parameter takes no effect</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> list</span> |
| <span class="sd"> results of specified partitions</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.cancelAllJobs`</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> myRDD = sc.parallelize(range(6), 3)</span> |
| <span class="sd"> >>> sc.runJob(myRDD, lambda part: [x * x for x in part])</span> |
| <span class="sd"> [0, 1, 4, 9, 16, 25]</span> |
| |
| <span class="sd"> >>> myRDD = sc.parallelize(range(6), 3)</span> |
| <span class="sd"> >>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)</span> |
| <span class="sd"> [0, 1, 16, 25]</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">partitions</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">partitions</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="nb">range</span><span class="p">(</span><span class="n">rdd</span><span class="o">.</span><span class="n">_jrdd</span><span class="o">.</span><span class="n">partitions</span><span class="p">()</span><span class="o">.</span><span class="n">size</span><span class="p">()))</span> |
| |
| <span class="c1"># Implementation note: This is implemented as a mapPartitions followed</span> |
| <span class="c1"># by runJob() in order to avoid having to pass a Python lambda into</span> |
| <span class="c1"># SparkContext#runJob.</span> |
| <span class="n">mappedRDD</span> <span class="o">=</span> <span class="n">rdd</span><span class="o">.</span><span class="n">mapPartitions</span><span class="p">(</span><span class="n">partitionFunc</span><span class="p">)</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">sock_info</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonRDD</span><span class="o">.</span><span class="n">runJob</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">sc</span><span class="p">(),</span> <span class="n">mappedRDD</span><span class="o">.</span><span class="n">_jrdd</span><span class="p">,</span> <span class="n">partitions</span><span class="p">)</span> |
| <span class="k">return</span> <span class="nb">list</span><span class="p">(</span><span class="n">_load_from_socket</span><span class="p">(</span><span class="n">sock_info</span><span class="p">,</span> <span class="n">mappedRDD</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.show_profiles"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.show_profiles.html#pyspark.SparkContext.show_profiles">[docs]</a> <span class="k">def</span> <span class="nf">show_profiles</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""Print the profile stats to stdout</span> |
| |
| <span class="sd"> .. versionadded:: 1.2.0</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.dump_profiles`</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">profiler_collector</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">profiler_collector</span><span class="o">.</span><span class="n">show_profiles</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span> |
| <span class="n">error_class</span><span class="o">=</span><span class="s2">"INCORRECT_CONF_FOR_PROFILE"</span><span class="p">,</span> |
| <span class="n">message_parameters</span><span class="o">=</span><span class="p">{},</span> |
| <span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.dump_profiles"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.dump_profiles.html#pyspark.SparkContext.dump_profiles">[docs]</a> <span class="k">def</span> <span class="nf">dump_profiles</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""Dump the profile stats into directory `path`</span> |
| |
| <span class="sd"> .. versionadded:: 1.2.0</span> |
| |
| <span class="sd"> See Also</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> :meth:`SparkContext.show_profiles`</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">profiler_collector</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">profiler_collector</span><span class="o">.</span><span class="n">dump_profiles</span><span class="p">(</span><span class="n">path</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span> |
| <span class="n">error_class</span><span class="o">=</span><span class="s2">"INCORRECT_CONF_FOR_PROFILE"</span><span class="p">,</span> |
| <span class="n">message_parameters</span><span class="o">=</span><span class="p">{},</span> |
| <span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="SparkContext.getConf"><a class="viewcode-back" href="../../reference/api/pyspark.SparkContext.getConf.html#pyspark.SparkContext.getConf">[docs]</a> <span class="k">def</span> <span class="nf">getConf</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">SparkConf</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""Return a copy of this SparkContext's configuration :class:`SparkConf`.</span> |
| |
| <span class="sd"> .. versionadded:: 2.1.0</span> |
| <span class="sd"> """</span> |
| <span class="n">conf</span> <span class="o">=</span> <span class="n">SparkConf</span><span class="p">()</span> |
| <span class="n">conf</span><span class="o">.</span><span class="n">setAll</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_conf</span><span class="o">.</span><span class="n">getAll</span><span class="p">())</span> |
| <span class="k">return</span> <span class="n">conf</span></div> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">resources</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">ResourceInformation</span><span class="p">]:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Return the resource information of this :class:`SparkContext`.</span> |
| <span class="sd"> A resource could be a GPU, FPGA, etc.</span> |
| |
| <span class="sd"> .. versionadded:: 3.0.0</span> |
| <span class="sd"> """</span> |
| <span class="n">resources</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">jresources</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsc</span><span class="o">.</span><span class="n">resources</span><span class="p">()</span> |
| <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">jresources</span><span class="p">:</span> |
| <span class="n">name</span> <span class="o">=</span> <span class="n">jresources</span><span class="p">[</span><span class="n">x</span><span class="p">]</span><span class="o">.</span><span class="n">name</span><span class="p">()</span> |
| <span class="n">jaddresses</span> <span class="o">=</span> <span class="n">jresources</span><span class="p">[</span><span class="n">x</span><span class="p">]</span><span class="o">.</span><span class="n">addresses</span><span class="p">()</span> |
| <span class="n">addrs</span> <span class="o">=</span> <span class="p">[</span><span class="n">addr</span> <span class="k">for</span> <span class="n">addr</span> <span class="ow">in</span> <span class="n">jaddresses</span><span class="p">]</span> |
| <span class="n">resources</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">ResourceInformation</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">addrs</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">resources</span> |
| |
| <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">_assert_on_driver</span><span class="p">()</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Called to ensure that SparkContext is created only on the Driver.</span> |
| |
| <span class="sd"> Throws an exception if a SparkContext is about to be created in executors.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">TaskContext</span><span class="o">.</span><span class="n">get</span><span class="p">()</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">PySparkRuntimeError</span><span class="p">(</span> |
| <span class="n">error_class</span><span class="o">=</span><span class="s2">"CONTEXT_ONLY_VALID_ON_DRIVER"</span><span class="p">,</span> |
| <span class="n">message_parameters</span><span class="o">=</span><span class="p">{},</span> |
| <span class="p">)</span></div> |
| |
| |
| <span class="k">def</span> <span class="nf">_test</span><span class="p">()</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="kn">import</span> <span class="nn">doctest</span> |
| <span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkConf</span> |
| |
| <span class="n">globs</span> <span class="o">=</span> <span class="nb">globals</span><span class="p">()</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span> |
| <span class="n">conf</span> <span class="o">=</span> <span class="n">SparkConf</span><span class="p">()</span><span class="o">.</span><span class="n">set</span><span class="p">(</span><span class="s2">"spark.ui.enabled"</span><span class="p">,</span> <span class="s2">"True"</span><span class="p">)</span> |
| <span class="n">globs</span><span class="p">[</span><span class="s2">"sc"</span><span class="p">]</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="s2">"local[4]"</span><span class="p">,</span> <span class="s2">"context tests"</span><span class="p">,</span> <span class="n">conf</span><span class="o">=</span><span class="n">conf</span><span class="p">)</span> |
| <span class="p">(</span><span class="n">failure_count</span><span class="p">,</span> <span class="n">test_count</span><span class="p">)</span> <span class="o">=</span> <span class="n">doctest</span><span class="o">.</span><span class="n">testmod</span><span class="p">(</span><span class="n">globs</span><span class="o">=</span><span class="n">globs</span><span class="p">,</span> <span class="n">optionflags</span><span class="o">=</span><span class="n">doctest</span><span class="o">.</span><span class="n">ELLIPSIS</span><span class="p">)</span> |
| <span class="n">globs</span><span class="p">[</span><span class="s2">"sc"</span><span class="p">]</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span> |
| <span class="k">if</span> <span class="n">failure_count</span><span class="p">:</span> |
| <span class="n">sys</span><span class="o">.</span><span class="n">exit</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">)</span> |
| |
| |
| <span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">"__main__"</span><span class="p">:</span> |
| <span class="n">_test</span><span class="p">()</span> |
| </pre></div> |
| |
| </div> |
| |
| |
| <!-- Previous / next buttons --> |
| <div class='prev-next-area'> |
| </div> |
| |
| </main> |
| |
| |
| </div> |
| </div> |
| |
| <script src="../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf"></script> |
| <footer class="footer mt-5 mt-md-0"> |
| <div class="container"> |
| |
| <div class="footer-item"> |
| <p class="copyright"> |
| © Copyright .<br> |
| </p> |
| </div> |
| |
| <div class="footer-item"> |
| <p class="sphinx-version"> |
| Created using <a href="http://sphinx-doc.org/">Sphinx</a> 3.0.4.<br> |
| </p> |
| </div> |
| |
| </div> |
| </footer> |
| </body> |
| </html> |