blob: f1b19e71c62044dac4d1db09230354404c609171 [file] [log] [blame]
<!DOCTYPE html>
<html >
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>pyspark.streaming.context &#8212; PySpark 4.0.0-preview2 documentation</title>
<script data-cfasync="false">
document.documentElement.dataset.mode = localStorage.getItem("mode") || "";
document.documentElement.dataset.theme = localStorage.getItem("theme") || "light";
</script>
<!-- Loaded before other Sphinx assets -->
<link href="../../../_static/styles/theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../../_static/styles/bootstrap.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../../_static/styles/pydata-sphinx-theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../../_static/vendor/fontawesome/6.1.2/css/all.min.css?digest=e353d410970836974a52" rel="stylesheet" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../../_static/vendor/fontawesome/6.1.2/webfonts/fa-solid-900.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../../_static/vendor/fontawesome/6.1.2/webfonts/fa-brands-400.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../../_static/vendor/fontawesome/6.1.2/webfonts/fa-regular-400.woff2" />
<link rel="stylesheet" type="text/css" href="../../../_static/pygments.css" />
<link rel="stylesheet" type="text/css" href="../../../_static/copybutton.css" />
<link rel="stylesheet" type="text/css" href="../../../_static/css/pyspark.css" />
<!-- Pre-loaded scripts that we'll load fully later -->
<link rel="preload" as="script" href="../../../_static/scripts/bootstrap.js?digest=e353d410970836974a52" />
<link rel="preload" as="script" href="../../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52" />
<script data-url_root="../../../" id="documentation_options" src="../../../_static/documentation_options.js"></script>
<script src="../../../_static/jquery.js"></script>
<script src="../../../_static/underscore.js"></script>
<script src="../../../_static/doctools.js"></script>
<script src="../../../_static/clipboard.min.js"></script>
<script src="../../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script>DOCUMENTATION_OPTIONS.pagename = '_modules/pyspark/streaming/context';</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/_modules/pyspark/streaming/context.html" />
<link rel="search" title="Search" href="../../../search.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="None">
<!-- Matomo -->
<script type="text/javascript">
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body data-bs-spy="scroll" data-bs-target=".bd-toc-nav" data-offset="180" data-bs-root-margin="0px 0px -60%" data-default-mode="">
<a class="skip-link" href="#main-content">Skip to main content</a>
<input type="checkbox"
class="sidebar-toggle"
name="__primary"
id="__primary"/>
<label class="overlay overlay-primary" for="__primary"></label>
<input type="checkbox"
class="sidebar-toggle"
name="__secondary"
id="__secondary"/>
<label class="overlay overlay-secondary" for="__secondary"></label>
<div class="search-button__wrapper">
<div class="search-button__overlay"></div>
<div class="search-button__search-container">
<form class="bd-search d-flex align-items-center"
action="../../../search.html"
method="get">
<i class="fa-solid fa-magnifying-glass"></i>
<input type="search"
class="form-control"
name="q"
id="search-input"
placeholder="Search the docs ..."
aria-label="Search the docs ..."
autocomplete="off"
autocorrect="off"
autocapitalize="off"
spellcheck="false"/>
<span class="search-button__kbd-shortcut"><kbd class="kbd-shortcut__modifier">Ctrl</kbd>+<kbd>K</kbd></span>
</form></div>
</div>
<nav class="bd-header navbar navbar-expand-lg bd-navbar">
<div class="bd-header__inner bd-page-width">
<label class="sidebar-toggle primary-toggle" for="__primary">
<span class="fa-solid fa-bars"></span>
</label>
<div class="navbar-header-items__start">
<div class="navbar-item">
<a class="navbar-brand logo" href="../../../index.html">
<img src="https://spark.apache.org/images/spark-logo.png" class="logo__image only-light" alt="Logo image"/>
<script>document.write(`<img src="https://spark.apache.org/images/spark-logo-rev.svg" class="logo__image only-dark" alt="Logo image"/>`);</script>
</a></div>
</div>
<div class="col-lg-9 navbar-header-items">
<div class="me-auto navbar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../user_guide/index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="navbar-header-items__end">
<div class="navbar-item navbar-persistent--container">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview2
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/streaming/context.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="navbar-persistent--mobile">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
</div>
</nav>
<div class="bd-container">
<div class="bd-container__inner bd-page-width">
<div class="bd-sidebar-primary bd-sidebar hide-on-wide">
<div class="sidebar-header-items sidebar-primary__section">
<div class="sidebar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../user_guide/index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="sidebar-header-items__end">
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview2
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/streaming/context.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="sidebar-primary-items__end sidebar-primary__section">
</div>
<div id="rtd-footer-container"></div>
</div>
<main id="main-content" class="bd-main">
<div class="bd-content">
<div class="bd-article-container">
<div class="bd-header-article">
<div class="header-article-items header-article__inner">
<div class="header-article-items__start">
<div class="header-article-item">
<nav aria-label="Breadcrumbs">
<ul class="bd-breadcrumbs" role="navigation" aria-label="Breadcrumb">
<li class="breadcrumb-item breadcrumb-home">
<a href="../../../index.html" class="nav-link" aria-label="Home">
<i class="fa-solid fa-home"></i>
</a>
</li>
<li class="breadcrumb-item"><a href="../../index.html" class="nav-link">Module code</a></li>
<li class="breadcrumb-item active" aria-current="page">pyspark.streaming.context</li>
</ul>
</nav>
</div>
</div>
</div>
</div>
<div id="searchbox"></div>
<article class="bd-article" role="main">
<h1>Source code for pyspark.streaming.context</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Any</span><span class="p">,</span> <span class="n">Callable</span><span class="p">,</span> <span class="n">List</span><span class="p">,</span> <span class="n">Optional</span><span class="p">,</span> <span class="n">TypeVar</span>
<span class="kn">from</span> <span class="nn">py4j.java_gateway</span> <span class="kn">import</span> <span class="n">java_import</span><span class="p">,</span> <span class="n">is_instance_of</span><span class="p">,</span> <span class="n">JavaObject</span>
<span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">RDD</span><span class="p">,</span> <span class="n">SparkConf</span>
<span class="kn">from</span> <span class="nn">pyspark.serializers</span> <span class="kn">import</span> <span class="n">NoOpSerializer</span><span class="p">,</span> <span class="n">UTF8Deserializer</span><span class="p">,</span> <span class="n">CloudPickleSerializer</span>
<span class="kn">from</span> <span class="nn">pyspark.core.context</span> <span class="kn">import</span> <span class="n">SparkContext</span>
<span class="kn">from</span> <span class="nn">pyspark.storagelevel</span> <span class="kn">import</span> <span class="n">StorageLevel</span>
<span class="kn">from</span> <span class="nn">pyspark.streaming.dstream</span> <span class="kn">import</span> <span class="n">DStream</span>
<span class="kn">from</span> <span class="nn">pyspark.streaming.listener</span> <span class="kn">import</span> <span class="n">StreamingListener</span>
<span class="kn">from</span> <span class="nn">pyspark.streaming.util</span> <span class="kn">import</span> <span class="n">TransformFunction</span><span class="p">,</span> <span class="n">TransformFunctionSerializer</span>
<span class="kn">import</span> <span class="nn">warnings</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s2">&quot;StreamingContext&quot;</span><span class="p">]</span>
<span class="n">T</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s2">&quot;T&quot;</span><span class="p">)</span>
<div class="viewcode-block" id="StreamingContext"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.html#pyspark.streaming.StreamingContext">[docs]</a><span class="k">class</span> <span class="nc">StreamingContext</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Main entry point for Spark Streaming functionality. A StreamingContext</span>
<span class="sd"> represents the connection to a Spark cluster, and can be used to create</span>
<span class="sd"> :class:`DStream` various input sources. It can be from an existing :class:`SparkContext`.</span>
<span class="sd"> After creating and transforming DStreams, the streaming computation can</span>
<span class="sd"> be started and stopped using `context.start()` and `context.stop()`,</span>
<span class="sd"> respectively. `context.awaitTermination()` allows the current thread</span>
<span class="sd"> to wait for the termination of the context by `stop()` or by an exception.</span>
<span class="sd"> .. deprecated:: Spark 3.4.0</span>
<span class="sd"> This is deprecated as of Spark 3.4.0.</span>
<span class="sd"> There are no longer updates to DStream and it&#39;s a legacy project.</span>
<span class="sd"> There is a newer and easier to use streaming engine in Spark called Structured Streaming.</span>
<span class="sd"> You should use Spark Structured Streaming for your streaming applications.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> sparkContext : :class:`SparkContext`</span>
<span class="sd"> SparkContext object.</span>
<span class="sd"> batchDuration : int, optional</span>
<span class="sd"> the time interval (in seconds) at which streaming</span>
<span class="sd"> data will be divided into batches</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">_transformerSerializer</span> <span class="o">=</span> <span class="kc">None</span>
<span class="c1"># Reference to a currently active StreamingContext</span>
<span class="n">_activeContext</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">sparkContext</span><span class="p">:</span> <span class="n">SparkContext</span><span class="p">,</span>
<span class="n">batchDuration</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">jssc</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">JavaObject</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">):</span>
<span class="n">warnings</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span>
<span class="s2">&quot;DStream is deprecated as of Spark 3.4.0. Migrate to Structured Streaming.&quot;</span><span class="p">,</span>
<span class="ne">FutureWarning</span><span class="p">,</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sc</span> <span class="o">=</span> <span class="n">sparkContext</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span> <span class="o">=</span> <span class="n">jssc</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">_initialize_context</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="p">,</span> <span class="n">batchDuration</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_initialize_context</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">sc</span><span class="p">:</span> <span class="n">SparkContext</span><span class="p">,</span> <span class="n">duration</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">JavaObject</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_ensure_initialized</span><span class="p">()</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">duration</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">JavaStreamingContext</span><span class="p">(</span><span class="n">sc</span><span class="o">.</span><span class="n">_jsc</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jduration</span><span class="p">(</span><span class="n">duration</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">_jduration</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">seconds</span><span class="p">:</span> <span class="nb">int</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">JavaObject</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Create Duration object given number of seconds</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">Duration</span><span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">seconds</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">))</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_ensure_initialized</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">SparkContext</span><span class="o">.</span><span class="n">_ensure_initialized</span><span class="p">()</span>
<span class="n">gw</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_gateway</span>
<span class="k">assert</span> <span class="n">gw</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">java_import</span><span class="p">(</span><span class="n">gw</span><span class="o">.</span><span class="n">jvm</span><span class="p">,</span> <span class="s2">&quot;org.apache.spark.streaming.*&quot;</span><span class="p">)</span>
<span class="n">java_import</span><span class="p">(</span><span class="n">gw</span><span class="o">.</span><span class="n">jvm</span><span class="p">,</span> <span class="s2">&quot;org.apache.spark.streaming.api.java.*&quot;</span><span class="p">)</span>
<span class="n">java_import</span><span class="p">(</span><span class="n">gw</span><span class="o">.</span><span class="n">jvm</span><span class="p">,</span> <span class="s2">&quot;org.apache.spark.streaming.api.python.*&quot;</span><span class="p">)</span>
<span class="kn">from</span> <span class="nn">pyspark.java_gateway</span> <span class="kn">import</span> <span class="n">ensure_callback_server_started</span>
<span class="n">ensure_callback_server_started</span><span class="p">(</span><span class="n">gw</span><span class="p">)</span>
<span class="c1"># register serializer for TransformFunction</span>
<span class="c1"># it happens before creating SparkContext when loading from checkpointing</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_transformerSerializer</span> <span class="o">=</span> <span class="n">TransformFunctionSerializer</span><span class="p">(</span>
<span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span><span class="p">,</span>
<span class="n">CloudPickleSerializer</span><span class="p">(),</span>
<span class="n">gw</span><span class="p">,</span>
<span class="p">)</span>
<div class="viewcode-block" id="StreamingContext.getOrCreate"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.getOrCreate.html#pyspark.streaming.StreamingContext.getOrCreate">[docs]</a> <span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">getOrCreate</span><span class="p">(</span>
<span class="bp">cls</span><span class="p">,</span> <span class="n">checkpointPath</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">setupFunc</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[],</span> <span class="s2">&quot;StreamingContext&quot;</span><span class="p">]</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;StreamingContext&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.</span>
<span class="sd"> If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be</span>
<span class="sd"> recreated from the checkpoint data. If the data does not exist, then the provided setupFunc</span>
<span class="sd"> will be used to create a new context.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> checkpointPath : str</span>
<span class="sd"> Checkpoint directory used in an earlier streaming program</span>
<span class="sd"> setupFunc : function</span>
<span class="sd"> Function to create a new context and setup DStreams</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_ensure_initialized</span><span class="p">()</span>
<span class="n">gw</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_gateway</span>
<span class="k">assert</span> <span class="n">gw</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="c1"># Check whether valid checkpoint information exists in the given path</span>
<span class="n">ssc_option</span> <span class="o">=</span> <span class="n">gw</span><span class="o">.</span><span class="n">jvm</span><span class="o">.</span><span class="n">StreamingContextPythonHelper</span><span class="p">()</span><span class="o">.</span><span class="n">tryRecoverFromCheckpoint</span><span class="p">(</span><span class="n">checkpointPath</span><span class="p">)</span>
<span class="k">if</span> <span class="n">ssc_option</span><span class="o">.</span><span class="n">isEmpty</span><span class="p">():</span>
<span class="n">ssc</span> <span class="o">=</span> <span class="n">setupFunc</span><span class="p">()</span>
<span class="n">ssc</span><span class="o">.</span><span class="n">checkpoint</span><span class="p">(</span><span class="n">checkpointPath</span><span class="p">)</span>
<span class="k">return</span> <span class="n">ssc</span>
<span class="n">jssc</span> <span class="o">=</span> <span class="n">gw</span><span class="o">.</span><span class="n">jvm</span><span class="o">.</span><span class="n">JavaStreamingContext</span><span class="p">(</span><span class="n">ssc_option</span><span class="o">.</span><span class="n">get</span><span class="p">())</span>
<span class="c1"># If there is already an active instance of Python SparkContext use it, or create a new one</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span><span class="p">:</span>
<span class="n">jsc</span> <span class="o">=</span> <span class="n">jssc</span><span class="o">.</span><span class="n">sparkContext</span><span class="p">()</span>
<span class="n">conf</span> <span class="o">=</span> <span class="n">SparkConf</span><span class="p">(</span><span class="n">_jconf</span><span class="o">=</span><span class="n">jsc</span><span class="o">.</span><span class="n">getConf</span><span class="p">())</span>
<span class="n">SparkContext</span><span class="p">(</span><span class="n">conf</span><span class="o">=</span><span class="n">conf</span><span class="p">,</span> <span class="n">gateway</span><span class="o">=</span><span class="n">gw</span><span class="p">,</span> <span class="n">jsc</span><span class="o">=</span><span class="n">jsc</span><span class="p">)</span>
<span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_active_spark_context</span>
<span class="k">assert</span> <span class="n">sc</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="c1"># update ctx in serializer</span>
<span class="k">assert</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_transformerSerializer</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_transformerSerializer</span><span class="o">.</span><span class="n">ctx</span> <span class="o">=</span> <span class="n">sc</span>
<span class="k">return</span> <span class="n">StreamingContext</span><span class="p">(</span><span class="n">sc</span><span class="p">,</span> <span class="kc">None</span><span class="p">,</span> <span class="n">jssc</span><span class="p">)</span></div>
<div class="viewcode-block" id="StreamingContext.getActive"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.getActive.html#pyspark.streaming.StreamingContext.getActive">[docs]</a> <span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">getActive</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="s2">&quot;StreamingContext&quot;</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return either the currently active StreamingContext (i.e., if there is a context started</span>
<span class="sd"> but not stopped) or None.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">activePythonContext</span> <span class="o">=</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_activeContext</span>
<span class="k">if</span> <span class="n">activePythonContext</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># Verify that the current running Java StreamingContext is active and is the same one</span>
<span class="c1"># backing the supposedly active Python context</span>
<span class="n">activePythonContextJavaId</span> <span class="o">=</span> <span class="n">activePythonContext</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">ssc</span><span class="p">()</span><span class="o">.</span><span class="n">hashCode</span><span class="p">()</span>
<span class="n">activeJvmContextOption</span> <span class="o">=</span> <span class="n">activePythonContext</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">StreamingContext</span><span class="o">.</span><span class="n">getActive</span><span class="p">()</span>
<span class="k">if</span> <span class="n">activeJvmContextOption</span><span class="o">.</span><span class="n">isEmpty</span><span class="p">():</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_activeContext</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">elif</span> <span class="n">activeJvmContextOption</span><span class="o">.</span><span class="n">get</span><span class="p">()</span><span class="o">.</span><span class="n">hashCode</span><span class="p">()</span> <span class="o">!=</span> <span class="n">activePythonContextJavaId</span><span class="p">:</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_activeContext</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span>
<span class="s2">&quot;JVM&#39;s active JavaStreamingContext is not the JavaStreamingContext &quot;</span>
<span class="s2">&quot;backing the action Python StreamingContext. This is unexpected.&quot;</span>
<span class="p">)</span>
<span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_activeContext</span></div>
<div class="viewcode-block" id="StreamingContext.getActiveOrCreate"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.getActiveOrCreate.html#pyspark.streaming.StreamingContext.getActiveOrCreate">[docs]</a> <span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">getActiveOrCreate</span><span class="p">(</span>
<span class="bp">cls</span><span class="p">,</span> <span class="n">checkpointPath</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">setupFunc</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[],</span> <span class="s2">&quot;StreamingContext&quot;</span><span class="p">]</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;StreamingContext&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Either return the active StreamingContext (i.e. currently started but not stopped),</span>
<span class="sd"> or recreate a StreamingContext from checkpoint data or create a new StreamingContext</span>
<span class="sd"> using the provided setupFunc function. If the checkpointPath is None or does not contain</span>
<span class="sd"> valid checkpoint data, then setupFunc will be called to create a new context and setup</span>
<span class="sd"> DStreams.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> checkpointPath : str</span>
<span class="sd"> Checkpoint directory used in an earlier streaming program. Can be</span>
<span class="sd"> None if the intention is to always create a new context when there</span>
<span class="sd"> is no active context.</span>
<span class="sd"> setupFunc : function</span>
<span class="sd"> Function to create a new JavaStreamingContext and setup DStreams</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">callable</span><span class="p">(</span><span class="n">setupFunc</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;setupFunc should be callable.&quot;</span><span class="p">)</span>
<span class="n">activeContext</span> <span class="o">=</span> <span class="bp">cls</span><span class="o">.</span><span class="n">getActive</span><span class="p">()</span>
<span class="k">if</span> <span class="n">activeContext</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">activeContext</span>
<span class="k">elif</span> <span class="n">checkpointPath</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">getOrCreate</span><span class="p">(</span><span class="n">checkpointPath</span><span class="p">,</span> <span class="n">setupFunc</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">setupFunc</span><span class="p">()</span></div>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">sparkContext</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">SparkContext</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return SparkContext which is associated with this StreamingContext.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span>
<div class="viewcode-block" id="StreamingContext.start"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.start.html#pyspark.streaming.StreamingContext.start">[docs]</a> <span class="k">def</span> <span class="nf">start</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Start the execution of the streams.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
<span class="n">StreamingContext</span><span class="o">.</span><span class="n">_activeContext</span> <span class="o">=</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="StreamingContext.awaitTermination"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.awaitTermination.html#pyspark.streaming.StreamingContext.awaitTermination">[docs]</a> <span class="k">def</span> <span class="nf">awaitTermination</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timeout</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Wait for the execution to stop.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> timeout : int, optional</span>
<span class="sd"> time to wait in seconds</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">timeout</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">awaitTermination</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">awaitTerminationOrTimeout</span><span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">timeout</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">))</span></div>
<div class="viewcode-block" id="StreamingContext.awaitTerminationOrTimeout"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.awaitTerminationOrTimeout.html#pyspark.streaming.StreamingContext.awaitTerminationOrTimeout">[docs]</a> <span class="k">def</span> <span class="nf">awaitTerminationOrTimeout</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timeout</span><span class="p">:</span> <span class="nb">int</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Wait for the execution to stop. Return `true` if it&#39;s stopped; or</span>
<span class="sd"> throw the reported error during the execution; or `false` if the</span>
<span class="sd"> waiting time elapsed before returning from the method.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> timeout : int</span>
<span class="sd"> time to wait in seconds</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">awaitTerminationOrTimeout</span><span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">timeout</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">))</span></div>
<div class="viewcode-block" id="StreamingContext.stop"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.stop.html#pyspark.streaming.StreamingContext.stop">[docs]</a> <span class="k">def</span> <span class="nf">stop</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">stopSparkContext</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">True</span><span class="p">,</span> <span class="n">stopGraceFully</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Stop the execution of the streams, with option of ensuring all</span>
<span class="sd"> received data has been processed.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> stopSparkContext : bool, optional</span>
<span class="sd"> Stop the associated SparkContext or not</span>
<span class="sd"> stopGracefully : bool, optional</span>
<span class="sd"> Stop gracefully by waiting for the processing of all received</span>
<span class="sd"> data to be completed</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">stop</span><span class="p">(</span><span class="n">stopSparkContext</span><span class="p">,</span> <span class="n">stopGraceFully</span><span class="p">)</span>
<span class="n">StreamingContext</span><span class="o">.</span><span class="n">_activeContext</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">stopSparkContext</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span></div>
<div class="viewcode-block" id="StreamingContext.remember"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.remember.html#pyspark.streaming.StreamingContext.remember">[docs]</a> <span class="k">def</span> <span class="nf">remember</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">duration</span><span class="p">:</span> <span class="nb">int</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Set each DStreams in this context to remember RDDs it generated</span>
<span class="sd"> in the last given duration. DStreams remember RDDs only for a</span>
<span class="sd"> limited duration of time and releases them for garbage collection.</span>
<span class="sd"> This method allows the developer to specify how long to remember</span>
<span class="sd"> the RDDs (if the developer wishes to query old data outside the</span>
<span class="sd"> DStream computation).</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> duration : int</span>
<span class="sd"> Minimum duration (in seconds) that each DStream should remember its RDDs</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">remember</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jduration</span><span class="p">(</span><span class="n">duration</span><span class="p">))</span></div>
<div class="viewcode-block" id="StreamingContext.checkpoint"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.checkpoint.html#pyspark.streaming.StreamingContext.checkpoint">[docs]</a> <span class="k">def</span> <span class="nf">checkpoint</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">directory</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Sets the context to periodically checkpoint the DStream operations for master</span>
<span class="sd"> fault-tolerance. The graph will be checkpointed every batch interval.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> directory : str</span>
<span class="sd"> HDFS-compatible directory where the checkpoint data will be reliably stored</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">checkpoint</span><span class="p">(</span><span class="n">directory</span><span class="p">)</span></div>
<div class="viewcode-block" id="StreamingContext.socketTextStream"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.socketTextStream.html#pyspark.streaming.StreamingContext.socketTextStream">[docs]</a> <span class="k">def</span> <span class="nf">socketTextStream</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">hostname</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">port</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> <span class="n">storageLevel</span><span class="p">:</span> <span class="n">StorageLevel</span> <span class="o">=</span> <span class="n">StorageLevel</span><span class="o">.</span><span class="n">MEMORY_AND_DISK_2</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[str]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Create an input from TCP source hostname:port. Data is received using</span>
<span class="sd"> a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` delimited</span>
<span class="sd"> lines.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> hostname : str</span>
<span class="sd"> Hostname to connect to for receiving data</span>
<span class="sd"> port : int</span>
<span class="sd"> Port to connect to for receiving data</span>
<span class="sd"> storageLevel : :class:`pyspark.StorageLevel`, optional</span>
<span class="sd"> Storage level to use for storing the received objects</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">jlevel</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_getJavaStorageLevel</span><span class="p">(</span><span class="n">storageLevel</span><span class="p">)</span>
<span class="k">return</span> <span class="n">DStream</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">socketTextStream</span><span class="p">(</span><span class="n">hostname</span><span class="p">,</span> <span class="n">port</span><span class="p">,</span> <span class="n">jlevel</span><span class="p">),</span> <span class="bp">self</span><span class="p">,</span> <span class="n">UTF8Deserializer</span><span class="p">()</span>
<span class="p">)</span></div>
<div class="viewcode-block" id="StreamingContext.textFileStream"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.textFileStream.html#pyspark.streaming.StreamingContext.textFileStream">[docs]</a> <span class="k">def</span> <span class="nf">textFileStream</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">directory</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[str]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Create an input stream that monitors a Hadoop-compatible file system</span>
<span class="sd"> for new files and reads them as text files. Files must be written to the</span>
<span class="sd"> monitored directory by &quot;moving&quot; them from another location within the same</span>
<span class="sd"> file system. File names starting with . are ignored.</span>
<span class="sd"> The text files must be encoded as UTF-8.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">DStream</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">textFileStream</span><span class="p">(</span><span class="n">directory</span><span class="p">),</span> <span class="bp">self</span><span class="p">,</span> <span class="n">UTF8Deserializer</span><span class="p">())</span></div>
<div class="viewcode-block" id="StreamingContext.binaryRecordsStream"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.binaryRecordsStream.html#pyspark.streaming.StreamingContext.binaryRecordsStream">[docs]</a> <span class="k">def</span> <span class="nf">binaryRecordsStream</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">directory</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">recordLength</span><span class="p">:</span> <span class="nb">int</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[bytes]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Create an input stream that monitors a Hadoop-compatible file system</span>
<span class="sd"> for new files and reads them as flat binary files with records of</span>
<span class="sd"> fixed length. Files must be written to the monitored directory by &quot;moving&quot;</span>
<span class="sd"> them from another location within the same file system.</span>
<span class="sd"> File names starting with . are ignored.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> directory : str</span>
<span class="sd"> Directory to load data from</span>
<span class="sd"> recordLength : int</span>
<span class="sd"> Length of each record in bytes</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">DStream</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">binaryRecordsStream</span><span class="p">(</span><span class="n">directory</span><span class="p">,</span> <span class="n">recordLength</span><span class="p">),</span> <span class="bp">self</span><span class="p">,</span> <span class="n">NoOpSerializer</span><span class="p">()</span>
<span class="p">)</span></div>
<span class="k">def</span> <span class="nf">_check_serializers</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">rdds</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]])</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># make sure they have same serializer</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="nb">set</span><span class="p">(</span><span class="n">rdd</span><span class="o">.</span><span class="n">_jrdd_deserializer</span> <span class="k">for</span> <span class="n">rdd</span> <span class="ow">in</span> <span class="n">rdds</span><span class="p">))</span> <span class="o">&gt;</span> <span class="mi">1</span><span class="p">:</span>
<span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="nb">len</span><span class="p">(</span><span class="n">rdds</span><span class="p">)):</span>
<span class="c1"># reset them to sc.serializer</span>
<span class="n">rdds</span><span class="p">[</span><span class="n">i</span><span class="p">]</span> <span class="o">=</span> <span class="n">rdds</span><span class="p">[</span><span class="n">i</span><span class="p">]</span><span class="o">.</span><span class="n">_reserialize</span><span class="p">()</span>
<div class="viewcode-block" id="StreamingContext.queueStream"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.queueStream.html#pyspark.streaming.StreamingContext.queueStream">[docs]</a> <span class="k">def</span> <span class="nf">queueStream</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">rdds</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span>
<span class="n">oneAtATime</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">True</span><span class="p">,</span>
<span class="n">default</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Create an input stream from a queue of RDDs or list. In each batch,</span>
<span class="sd"> it will process either one or all of the RDDs returned by the queue.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> rdds : list</span>
<span class="sd"> Queue of RDDs</span>
<span class="sd"> oneAtATime : bool, optional</span>
<span class="sd"> pick one rdd each time or pick all of them once.</span>
<span class="sd"> default : :class:`pyspark.RDD`, optional</span>
<span class="sd"> The default rdd if no more in rdds</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> Changes to the queue after the stream is created will not be recognized.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">default</span> <span class="ow">and</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">default</span><span class="p">,</span> <span class="n">RDD</span><span class="p">):</span>
<span class="n">default</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="n">default</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">rdds</span> <span class="ow">and</span> <span class="n">default</span><span class="p">:</span>
<span class="n">rdds</span> <span class="o">=</span> <span class="p">[</span><span class="n">rdds</span><span class="p">]</span> <span class="c1"># type: ignore[list-item]</span>
<span class="k">if</span> <span class="n">rdds</span> <span class="ow">and</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">rdds</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">RDD</span><span class="p">):</span>
<span class="n">rdds</span> <span class="o">=</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="nb">input</span><span class="p">)</span> <span class="k">for</span> <span class="nb">input</span> <span class="ow">in</span> <span class="n">rdds</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_check_serializers</span><span class="p">(</span><span class="n">rdds</span><span class="p">)</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">queue</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonDStream</span><span class="o">.</span><span class="n">toRDDQueue</span><span class="p">([</span><span class="n">r</span><span class="o">.</span><span class="n">_jrdd</span> <span class="k">for</span> <span class="n">r</span> <span class="ow">in</span> <span class="n">rdds</span><span class="p">])</span>
<span class="k">if</span> <span class="n">default</span><span class="p">:</span>
<span class="n">default</span> <span class="o">=</span> <span class="n">default</span><span class="o">.</span><span class="n">_reserialize</span><span class="p">(</span><span class="n">rdds</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">)</span>
<span class="k">assert</span> <span class="n">default</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">jdstream</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">queueStream</span><span class="p">(</span><span class="n">queue</span><span class="p">,</span> <span class="n">oneAtATime</span><span class="p">,</span> <span class="n">default</span><span class="o">.</span><span class="n">_jrdd</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">jdstream</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">queueStream</span><span class="p">(</span><span class="n">queue</span><span class="p">,</span> <span class="n">oneAtATime</span><span class="p">)</span>
<span class="k">return</span> <span class="n">DStream</span><span class="p">(</span><span class="n">jdstream</span><span class="p">,</span> <span class="bp">self</span><span class="p">,</span> <span class="n">rdds</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">)</span></div>
<div class="viewcode-block" id="StreamingContext.transform"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.transform.html#pyspark.streaming.StreamingContext.transform">[docs]</a> <span class="k">def</span> <span class="nf">transform</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">dstreams</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="s2">&quot;DStream[Any]&quot;</span><span class="p">],</span> <span class="n">transformFunc</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[</span><span class="o">...</span><span class="p">,</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]]</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Create a new DStream in which each RDD is generated by applying</span>
<span class="sd"> a function on RDDs of the DStreams. The order of the JavaRDDs in</span>
<span class="sd"> the transform function parameter will be the same as the order</span>
<span class="sd"> of corresponding DStreams in the list.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">jdstreams</span> <span class="o">=</span> <span class="p">[</span><span class="n">d</span><span class="o">.</span><span class="n">_jdstream</span> <span class="k">for</span> <span class="n">d</span> <span class="ow">in</span> <span class="n">dstreams</span><span class="p">]</span>
<span class="c1"># change the final serializer to sc.serializer</span>
<span class="n">func</span> <span class="o">=</span> <span class="n">TransformFunction</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="p">,</span>
<span class="k">lambda</span> <span class="n">t</span><span class="p">,</span> <span class="o">*</span><span class="n">rdds</span><span class="p">:</span> <span class="n">transformFunc</span><span class="p">(</span><span class="n">rdds</span><span class="p">),</span>
<span class="o">*</span><span class="p">[</span><span class="n">d</span><span class="o">.</span><span class="n">_jrdd_deserializer</span> <span class="k">for</span> <span class="n">d</span> <span class="ow">in</span> <span class="n">dstreams</span><span class="p">],</span>
<span class="p">)</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">jfunc</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">TransformFunction</span><span class="p">(</span><span class="n">func</span><span class="p">)</span>
<span class="n">jdstream</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">transform</span><span class="p">(</span><span class="n">jdstreams</span><span class="p">,</span> <span class="n">jfunc</span><span class="p">)</span>
<span class="k">return</span> <span class="n">DStream</span><span class="p">(</span><span class="n">jdstream</span><span class="p">,</span> <span class="bp">self</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">serializer</span><span class="p">)</span></div>
<div class="viewcode-block" id="StreamingContext.union"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.union.html#pyspark.streaming.StreamingContext.union">[docs]</a> <span class="k">def</span> <span class="nf">union</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">dstreams</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Create a unified DStream from multiple DStreams of the same</span>
<span class="sd"> type and same slide duration.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">dstreams</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;should have at least one DStream to union&quot;</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">dstreams</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="k">return</span> <span class="n">dstreams</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="nb">set</span><span class="p">(</span><span class="n">s</span><span class="o">.</span><span class="n">_jrdd_deserializer</span> <span class="k">for</span> <span class="n">s</span> <span class="ow">in</span> <span class="n">dstreams</span><span class="p">))</span> <span class="o">&gt;</span> <span class="mi">1</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;All DStreams should have same serializer&quot;</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="nb">set</span><span class="p">(</span><span class="n">s</span><span class="o">.</span><span class="n">_slideDuration</span> <span class="k">for</span> <span class="n">s</span> <span class="ow">in</span> <span class="n">dstreams</span><span class="p">))</span> <span class="o">&gt;</span> <span class="mi">1</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;All DStreams should have same slide duration&quot;</span><span class="p">)</span>
<span class="k">assert</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">jdstream_cls</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">streaming</span><span class="o">.</span><span class="n">api</span><span class="o">.</span><span class="n">java</span><span class="o">.</span><span class="n">JavaDStream</span>
<span class="n">jpair_dstream_cls</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">streaming</span><span class="o">.</span><span class="n">api</span><span class="o">.</span><span class="n">java</span><span class="o">.</span><span class="n">JavaPairDStream</span>
<span class="n">gw</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="o">.</span><span class="n">_gateway</span>
<span class="k">if</span> <span class="n">is_instance_of</span><span class="p">(</span><span class="n">gw</span><span class="p">,</span> <span class="n">dstreams</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">_jdstream</span><span class="p">,</span> <span class="n">jdstream_cls</span><span class="p">):</span>
<span class="bp">cls</span> <span class="o">=</span> <span class="n">jdstream_cls</span>
<span class="k">elif</span> <span class="n">is_instance_of</span><span class="p">(</span><span class="n">gw</span><span class="p">,</span> <span class="n">dstreams</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">_jdstream</span><span class="p">,</span> <span class="n">jpair_dstream_cls</span><span class="p">):</span>
<span class="bp">cls</span> <span class="o">=</span> <span class="n">jpair_dstream_cls</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">cls_name</span> <span class="o">=</span> <span class="n">dstreams</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">getClass</span><span class="p">()</span><span class="o">.</span><span class="n">getCanonicalName</span><span class="p">()</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;Unsupported Java DStream class </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">cls_name</span><span class="p">)</span>
<span class="k">assert</span> <span class="n">gw</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">jdstreams</span> <span class="o">=</span> <span class="n">gw</span><span class="o">.</span><span class="n">new_array</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">dstreams</span><span class="p">))</span>
<span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">dstreams</span><span class="p">)):</span>
<span class="n">jdstreams</span><span class="p">[</span><span class="n">i</span><span class="p">]</span> <span class="o">=</span> <span class="n">dstreams</span><span class="p">[</span><span class="n">i</span><span class="p">]</span><span class="o">.</span><span class="n">_jdstream</span>
<span class="k">return</span> <span class="n">DStream</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">union</span><span class="p">(</span><span class="n">jdstreams</span><span class="p">),</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">dstreams</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">,</span>
<span class="p">)</span></div>
<div class="viewcode-block" id="StreamingContext.addStreamingListener"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.StreamingContext.addStreamingListener.html#pyspark.streaming.StreamingContext.addStreamingListener">[docs]</a> <span class="k">def</span> <span class="nf">addStreamingListener</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">streamingListener</span><span class="p">:</span> <span class="n">StreamingListener</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for</span>
<span class="sd"> receiving system events related to streaming.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jssc</span><span class="o">.</span><span class="n">addStreamingListener</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">JavaStreamingListenerWrapper</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonStreamingListenerWrapper</span><span class="p">(</span><span class="n">streamingListener</span><span class="p">)</span>
<span class="p">)</span>
<span class="p">)</span></div></div>
</pre></div>
</article>
<footer class="bd-footer-article">
<div class="footer-article-items footer-article__inner">
<div class="footer-article-item"><!-- Previous / next buttons -->
<div class="prev-next-area">
</div></div>
</div>
</footer>
</div>
</div>
<footer class="bd-footer-content">
</footer>
</main>
</div>
</div>
<!-- Scripts loaded after <body> so the DOM is not blocked -->
<script src="../../../_static/scripts/bootstrap.js?digest=e353d410970836974a52"></script>
<script src="../../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52"></script>
<footer class="bd-footer">
<div class="bd-footer__inner bd-page-width">
<div class="footer-items__start">
<div class="footer-item"><p class="copyright">
Copyright @ 2024 The Apache Software Foundation, Licensed under the <a href="https://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
</p></div>
<div class="footer-item">
<p class="sphinx-version">
Created using <a href="https://www.sphinx-doc.org/">Sphinx</a> 4.5.0.
<br/>
</p>
</div>
</div>
<div class="footer-items__end">
<div class="footer-item"><p class="theme-version">
Built with the <a href="https://pydata-sphinx-theme.readthedocs.io/en/stable/index.html">PyData Sphinx Theme</a> 0.13.3.
</p></div>
</div>
</div>
</footer>
</body>
</html>