blob: 3fc1ca622d9a7973632f6d621e4e23ade1e14d1c [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>pyspark.streaming.dstream &#8212; PySpark 3.5.5 documentation</title>
<link href="../../../_static/styles/theme.css?digest=1999514e3f237ded88cf" rel="stylesheet">
<link href="../../../_static/styles/pydata-sphinx-theme.css?digest=1999514e3f237ded88cf" rel="stylesheet">
<link rel="stylesheet"
href="../../../_static/vendor/fontawesome/5.13.0/css/all.min.css">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../../_static/vendor/fontawesome/5.13.0/webfonts/fa-solid-900.woff2">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../../_static/vendor/fontawesome/5.13.0/webfonts/fa-brands-400.woff2">
<link rel="stylesheet" href="../../../_static/styles/pydata-sphinx-theme.css" type="text/css" />
<link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
<link rel="stylesheet" type="text/css" href="../../../_static/copybutton.css" />
<link rel="stylesheet" type="text/css" href="../../../_static/css/pyspark.css" />
<link rel="preload" as="script" href="../../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf">
<script id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script>
<script src="../../../_static/jquery.js"></script>
<script src="../../../_static/underscore.js"></script>
<script src="../../../_static/doctools.js"></script>
<script src="../../../_static/language_data.js"></script>
<script src="../../../_static/clipboard.min.js"></script>
<script src="../../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script async="async" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/x-mathjax-config">MathJax.Hub.Config({"tex2jax": {"inlineMath": [["$", "$"], ["\\(", "\\)"]], "processEscapes": true, "ignoreClass": "document", "processClass": "math|output_area"}})</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/_modules/pyspark/streaming/dstream.html" />
<link rel="search" title="Search" href="../../../search.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="None">
<!-- Google Analytics -->
</head>
<body data-spy="scroll" data-target="#bd-toc-nav" data-offset="80">
<div class="container-fluid" id="banner"></div>
<nav class="navbar navbar-light navbar-expand-lg bg-light fixed-top bd-navbar" id="navbar-main"><div class="container-xl">
<div id="navbar-start">
<a class="navbar-brand" href="../../../index.html">
<img src="../../../_static/spark-logo-reverse.png" class="logo" alt="logo">
</a>
</div>
<button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbar-collapsible" aria-controls="navbar-collapsible" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div id="navbar-collapsible" class="col-lg-9 collapse navbar-collapse">
<div id="navbar-center" class="mr-auto">
<div class="navbar-center-item">
<ul id="navbar-main-elements" class="navbar-nav">
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../../index.html">
Overview
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../../user_guide/index.html">
User Guides
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../../reference/index.html">
API Reference
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../../development/index.html">
Development
</a>
</li>
<li class="toctree-l1 nav-item">
<a class="reference internal nav-link" href="../../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</div>
</div>
<div id="navbar-end">
<div class="navbar-end-item">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
3.5.5
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/streaming/dstream.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script>
</div>
</div>
</div>
</div>
</nav>
<div class="container-xl">
<div class="row">
<!-- Only show if we have sidebars configured, else just a small margin -->
<div class="col-12 col-md-3 bd-sidebar">
<div class="sidebar-start-items"><form class="bd-search d-flex align-items-center" action="../../../search.html" method="get">
<i class="icon fas fa-search"></i>
<input type="search" class="form-control" name="q" id="search-input" placeholder="Search the docs ..." aria-label="Search the docs ..." autocomplete="off" >
</form><nav class="bd-links" id="bd-docs-nav" aria-label="Main navigation">
<div class="bd-toc-item active">
</div>
</nav>
</div>
<div class="sidebar-end-items">
</div>
</div>
<div class="d-none d-xl-block col-xl-2 bd-toc">
</div>
<main class="col-12 col-md-9 col-xl-7 py-md-5 pl-md-5 pr-md-4 bd-content" role="main">
<div>
<h1>Source code for pyspark.streaming.dstream</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">operator</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">time</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">itertools</span><span class="w"> </span><span class="kn">import</span> <span class="n">chain</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">datetime</span><span class="w"> </span><span class="kn">import</span> <span class="n">datetime</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">typing</span><span class="w"> </span><span class="kn">import</span> <span class="p">(</span>
<span class="n">Any</span><span class="p">,</span>
<span class="n">Callable</span><span class="p">,</span>
<span class="n">Generic</span><span class="p">,</span>
<span class="n">Hashable</span><span class="p">,</span>
<span class="n">Iterable</span><span class="p">,</span>
<span class="n">List</span><span class="p">,</span>
<span class="n">Optional</span><span class="p">,</span>
<span class="n">Tuple</span><span class="p">,</span>
<span class="n">TypeVar</span><span class="p">,</span>
<span class="n">Union</span><span class="p">,</span>
<span class="n">TYPE_CHECKING</span><span class="p">,</span>
<span class="n">cast</span><span class="p">,</span>
<span class="n">overload</span><span class="p">,</span>
<span class="p">)</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">py4j.protocol</span><span class="w"> </span><span class="kn">import</span> <span class="n">Py4JJavaError</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.storagelevel</span><span class="w"> </span><span class="kn">import</span> <span class="n">StorageLevel</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.streaming.util</span><span class="w"> </span><span class="kn">import</span> <span class="n">rddToFileName</span><span class="p">,</span> <span class="n">TransformFunction</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.rdd</span><span class="w"> </span><span class="kn">import</span> <span class="n">portable_hash</span><span class="p">,</span> <span class="n">RDD</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.resultiterable</span><span class="w"> </span><span class="kn">import</span> <span class="n">ResultIterable</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">py4j.java_gateway</span><span class="w"> </span><span class="kn">import</span> <span class="n">JavaObject</span>
<span class="k">if</span> <span class="n">TYPE_CHECKING</span><span class="p">:</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.serializers</span><span class="w"> </span><span class="kn">import</span> <span class="n">Serializer</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">pyspark.streaming.context</span><span class="w"> </span><span class="kn">import</span> <span class="n">StreamingContext</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s2">&quot;DStream&quot;</span><span class="p">]</span>
<span class="n">S</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s2">&quot;S&quot;</span><span class="p">)</span>
<span class="n">T</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s2">&quot;T&quot;</span><span class="p">)</span>
<span class="n">T_co</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s2">&quot;T_co&quot;</span><span class="p">,</span> <span class="n">covariant</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="n">U</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s2">&quot;U&quot;</span><span class="p">)</span>
<span class="n">K</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s2">&quot;K&quot;</span><span class="p">,</span> <span class="n">bound</span><span class="o">=</span><span class="n">Hashable</span><span class="p">)</span>
<span class="n">V</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s2">&quot;V&quot;</span><span class="p">)</span>
<div class="viewcode-block" id="DStream"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.html#pyspark.streaming.DStream">[docs]</a><span class="k">class</span><span class="w"> </span><span class="nc">DStream</span><span class="p">(</span><span class="n">Generic</span><span class="p">[</span><span class="n">T_co</span><span class="p">]):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A Discretized Stream (DStream), the basic abstraction in Spark Streaming,</span>
<span class="sd"> is a continuous sequence of RDDs (of the same type) representing a</span>
<span class="sd"> continuous stream of data (see :class:`RDD` in the Spark core documentation</span>
<span class="sd"> for more details on RDDs).</span>
<span class="sd"> DStreams can either be created from live data (such as, data from TCP</span>
<span class="sd"> sockets, etc.) using a :class:`StreamingContext` or it can be</span>
<span class="sd"> generated by transforming existing DStreams using operations such as</span>
<span class="sd"> `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming</span>
<span class="sd"> program is running, each DStream periodically generates a RDD, either</span>
<span class="sd"> from live data or by transforming the RDD generated by a parent DStream.</span>
<span class="sd"> DStreams internally is characterized by a few basic properties:</span>
<span class="sd"> - A list of other DStreams that the DStream depends on</span>
<span class="sd"> - A time interval at which the DStream generates an RDD</span>
<span class="sd"> - A function that is used to generate an RDD after each time interval</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">jdstream</span><span class="p">:</span> <span class="n">JavaObject</span><span class="p">,</span>
<span class="n">ssc</span><span class="p">:</span> <span class="s2">&quot;StreamingContext&quot;</span><span class="p">,</span>
<span class="n">jrdd_deserializer</span><span class="p">:</span> <span class="s2">&quot;Serializer&quot;</span><span class="p">,</span>
<span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jdstream</span> <span class="o">=</span> <span class="n">jdstream</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span> <span class="o">=</span> <span class="n">ssc</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sc</span> <span class="o">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">_sc</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jrdd_deserializer</span> <span class="o">=</span> <span class="n">jrdd_deserializer</span>
<span class="bp">self</span><span class="o">.</span><span class="n">is_cached</span> <span class="o">=</span> <span class="kc">False</span>
<span class="bp">self</span><span class="o">.</span><span class="n">is_checkpointed</span> <span class="o">=</span> <span class="kc">False</span>
<div class="viewcode-block" id="DStream.context"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.context.html#pyspark.streaming.DStream.context">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">context</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;StreamingContext&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return the StreamingContext associated with this DStream</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span></div>
<div class="viewcode-block" id="DStream.count"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.count.html#pyspark.streaming.DStream.count">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">count</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[int]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream in which each RDD has a single element</span>
<span class="sd"> generated by counting each RDD of this DStream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">mapPartitions</span><span class="p">(</span><span class="k">lambda</span> <span class="n">i</span><span class="p">:</span> <span class="p">[</span><span class="nb">sum</span><span class="p">(</span><span class="mi">1</span> <span class="k">for</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">i</span><span class="p">)])</span><span class="o">.</span><span class="n">reduce</span><span class="p">(</span><span class="n">operator</span><span class="o">.</span><span class="n">add</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.filter"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.filter.html#pyspark.streaming.DStream.filter">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">filter</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span> <span class="n">f</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">T</span><span class="p">],</span> <span class="nb">bool</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream containing only the elements that satisfy predicate.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">func</span><span class="p">(</span><span class="n">iterator</span><span class="p">:</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">T</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">T</span><span class="p">]:</span>
<span class="k">return</span> <span class="nb">filter</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="n">iterator</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">mapPartitions</span><span class="p">(</span><span class="n">func</span><span class="p">,</span> <span class="kc">True</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.flatMap"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.flatMap.html#pyspark.streaming.DStream.flatMap">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">flatMap</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span>
<span class="n">f</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">T</span><span class="p">],</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">U</span><span class="p">]],</span>
<span class="n">preservesPartitioning</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[U]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying a function to all elements of</span>
<span class="sd"> this DStream, and then flattening the results</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">func</span><span class="p">(</span><span class="n">s</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> <span class="n">iterator</span><span class="p">:</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">T</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">U</span><span class="p">]:</span>
<span class="k">return</span> <span class="n">chain</span><span class="o">.</span><span class="n">from_iterable</span><span class="p">(</span><span class="nb">map</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="n">iterator</span><span class="p">))</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">mapPartitionsWithIndex</span><span class="p">(</span><span class="n">func</span><span class="p">,</span> <span class="n">preservesPartitioning</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.map"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.map.html#pyspark.streaming.DStream.map">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">map</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span> <span class="n">f</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">T</span><span class="p">],</span> <span class="n">U</span><span class="p">],</span> <span class="n">preservesPartitioning</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[U]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying a function to each element of DStream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">func</span><span class="p">(</span><span class="n">iterator</span><span class="p">:</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">T</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">U</span><span class="p">]:</span>
<span class="k">return</span> <span class="nb">map</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="n">iterator</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">mapPartitions</span><span class="p">(</span><span class="n">func</span><span class="p">,</span> <span class="n">preservesPartitioning</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.mapPartitions"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.mapPartitions.html#pyspark.streaming.DStream.mapPartitions">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">mapPartitions</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span>
<span class="n">f</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">Iterable</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">U</span><span class="p">]],</span>
<span class="n">preservesPartitioning</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[U]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream in which each RDD is generated by applying</span>
<span class="sd"> mapPartitions() to each RDDs of this DStream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">func</span><span class="p">(</span><span class="n">s</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> <span class="n">iterator</span><span class="p">:</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">T</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">U</span><span class="p">]:</span>
<span class="k">return</span> <span class="n">f</span><span class="p">(</span><span class="n">iterator</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">mapPartitionsWithIndex</span><span class="p">(</span><span class="n">func</span><span class="p">,</span> <span class="n">preservesPartitioning</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.mapPartitionsWithIndex"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.mapPartitionsWithIndex.html#pyspark.streaming.DStream.mapPartitionsWithIndex">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">mapPartitionsWithIndex</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span>
<span class="n">f</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="nb">int</span><span class="p">,</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">U</span><span class="p">]],</span>
<span class="n">preservesPartitioning</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[U]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream in which each RDD is generated by applying</span>
<span class="sd"> mapPartitionsWithIndex() to each RDDs of this DStream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform</span><span class="p">(</span><span class="k">lambda</span> <span class="n">rdd</span><span class="p">:</span> <span class="n">rdd</span><span class="o">.</span><span class="n">mapPartitionsWithIndex</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="n">preservesPartitioning</span><span class="p">))</span></div>
<div class="viewcode-block" id="DStream.reduce"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.reduce.html#pyspark.streaming.DStream.reduce">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span> <span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">T</span><span class="p">,</span> <span class="n">T</span><span class="p">],</span> <span class="n">T</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream in which each RDD has a single element</span>
<span class="sd"> generated by reducing each RDD of this DStream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">x</span><span class="p">))</span><span class="o">.</span><span class="n">reduceByKey</span><span class="p">(</span><span class="n">func</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span></div>
<div class="viewcode-block" id="DStream.reduceByKey"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.reduceByKey.html#pyspark.streaming.DStream.reduceByKey">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">reduceByKey</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span>
<span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">V</span><span class="p">,</span> <span class="n">V</span><span class="p">],</span> <span class="n">V</span><span class="p">],</span>
<span class="n">numPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying reduceByKey to each RDD.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">numPartitions</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">numPartitions</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">defaultParallelism</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">combineByKey</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">,</span> <span class="n">func</span><span class="p">,</span> <span class="n">func</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.combineByKey"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.combineByKey.html#pyspark.streaming.DStream.combineByKey">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">combineByKey</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span>
<span class="n">createCombiner</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">V</span><span class="p">],</span> <span class="n">U</span><span class="p">],</span>
<span class="n">mergeValue</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">U</span><span class="p">,</span> <span class="n">V</span><span class="p">],</span> <span class="n">U</span><span class="p">],</span>
<span class="n">mergeCombiners</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">U</span><span class="p">,</span> <span class="n">U</span><span class="p">],</span> <span class="n">U</span><span class="p">],</span>
<span class="n">numPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, U]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying combineByKey to each RDD.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">numPartitions</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">numPartitions</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">defaultParallelism</span>
<span class="k">def</span><span class="w"> </span><span class="nf">func</span><span class="p">(</span><span class="n">rdd</span><span class="p">:</span> <span class="n">RDD</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">V</span><span class="p">]])</span> <span class="o">-&gt;</span> <span class="n">RDD</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">U</span><span class="p">]]:</span>
<span class="k">return</span> <span class="n">rdd</span><span class="o">.</span><span class="n">combineByKey</span><span class="p">(</span><span class="n">createCombiner</span><span class="p">,</span> <span class="n">mergeValue</span><span class="p">,</span> <span class="n">mergeCombiners</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform</span><span class="p">(</span><span class="n">func</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.partitionBy"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.partitionBy.html#pyspark.streaming.DStream.partitionBy">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">partitionBy</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span>
<span class="n">numPartitions</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">partitionFunc</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">K</span><span class="p">],</span> <span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="n">portable_hash</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a copy of the DStream in which each RDD are partitioned</span>
<span class="sd"> using the specified partitioner.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform</span><span class="p">(</span><span class="k">lambda</span> <span class="n">rdd</span><span class="p">:</span> <span class="n">rdd</span><span class="o">.</span><span class="n">partitionBy</span><span class="p">(</span><span class="n">numPartitions</span><span class="p">,</span> <span class="n">partitionFunc</span><span class="p">))</span></div>
<span class="nd">@overload</span>
<span class="k">def</span><span class="w"> </span><span class="nf">foreachRDD</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span> <span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="kc">None</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="o">...</span>
<span class="nd">@overload</span>
<span class="k">def</span><span class="w"> </span><span class="nf">foreachRDD</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span> <span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">datetime</span><span class="p">,</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="kc">None</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="o">...</span>
<div class="viewcode-block" id="DStream.foreachRDD"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.foreachRDD.html#pyspark.streaming.DStream.foreachRDD">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">foreachRDD</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span>
<span class="n">func</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">Callable</span><span class="p">[[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="kc">None</span><span class="p">],</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">datetime</span><span class="p">,</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="kc">None</span><span class="p">]],</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Apply a function to each RDD in this DStream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">func</span><span class="o">.</span><span class="vm">__code__</span><span class="o">.</span><span class="n">co_argcount</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="n">old_func</span> <span class="o">=</span> <span class="n">func</span>
<span class="k">def</span><span class="w"> </span><span class="nf">func</span><span class="p">(</span><span class="n">_</span><span class="p">:</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">rdd</span><span class="p">:</span> <span class="s2">&quot;RDD[T]&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">old_func</span><span class="p">(</span><span class="n">rdd</span><span class="p">)</span> <span class="c1"># type: ignore[call-arg, arg-type]</span>
<span class="n">jfunc</span> <span class="o">=</span> <span class="n">TransformFunction</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="p">,</span> <span class="n">func</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">)</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">api</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonDStream</span>
<span class="n">api</span><span class="o">.</span><span class="n">callForeachRDD</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jdstream</span><span class="p">,</span> <span class="n">jfunc</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.pprint"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.pprint.html#pyspark.streaming.DStream.pprint">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">pprint</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">num</span><span class="p">:</span> <span class="nb">int</span> <span class="o">=</span> <span class="mi">10</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Print the first num elements of each RDD generated in this DStream.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> num : int, optional</span>
<span class="sd"> the number of elements from the first will be printed.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">takeAndPrint</span><span class="p">(</span><span class="n">time</span><span class="p">:</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">rdd</span><span class="p">:</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">taken</span> <span class="o">=</span> <span class="n">rdd</span><span class="o">.</span><span class="n">take</span><span class="p">(</span><span class="n">num</span> <span class="o">+</span> <span class="mi">1</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">&quot;-------------------------------------------&quot;</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">&quot;Time: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">time</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">&quot;-------------------------------------------&quot;</span><span class="p">)</span>
<span class="k">for</span> <span class="n">record</span> <span class="ow">in</span> <span class="n">taken</span><span class="p">[:</span><span class="n">num</span><span class="p">]:</span>
<span class="nb">print</span><span class="p">(</span><span class="n">record</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">taken</span><span class="p">)</span> <span class="o">&gt;</span> <span class="n">num</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">&quot;...&quot;</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">&quot;&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">foreachRDD</span><span class="p">(</span><span class="n">takeAndPrint</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.mapValues"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.mapValues.html#pyspark.streaming.DStream.mapValues">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">mapValues</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span> <span class="n">f</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">V</span><span class="p">],</span> <span class="n">U</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, U]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying a map function to the value of</span>
<span class="sd"> each key-value pairs in this DStream without changing the key.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">map_values_fn</span><span class="p">(</span><span class="n">kv</span><span class="p">:</span> <span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">V</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">U</span><span class="p">]:</span>
<span class="k">return</span> <span class="n">kv</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">f</span><span class="p">(</span><span class="n">kv</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">map_values_fn</span><span class="p">,</span> <span class="n">preservesPartitioning</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.flatMapValues"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.flatMapValues.html#pyspark.streaming.DStream.flatMapValues">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">flatMapValues</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span> <span class="n">f</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">V</span><span class="p">],</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">U</span><span class="p">]]</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, U]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying a flatmap function to the value</span>
<span class="sd"> of each key-value pairs in this DStream without changing the key.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">flat_map_fn</span><span class="p">(</span><span class="n">kv</span><span class="p">:</span> <span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">V</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">U</span><span class="p">]]:</span>
<span class="k">return</span> <span class="p">((</span><span class="n">kv</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">x</span><span class="p">)</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">f</span><span class="p">(</span><span class="n">kv</span><span class="p">[</span><span class="mi">1</span><span class="p">]))</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">flatMap</span><span class="p">(</span><span class="n">flat_map_fn</span><span class="p">,</span> <span class="n">preservesPartitioning</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.glom"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.glom.html#pyspark.streaming.DStream.glom">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">glom</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[List[T]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream in which RDD is generated by applying glom()</span>
<span class="sd"> to RDD of this DStream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">func</span><span class="p">(</span><span class="n">iterator</span><span class="p">:</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">T</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">List</span><span class="p">[</span><span class="n">T</span><span class="p">]]:</span>
<span class="k">yield</span> <span class="nb">list</span><span class="p">(</span><span class="n">iterator</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">mapPartitions</span><span class="p">(</span><span class="n">func</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.cache"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.cache.html#pyspark.streaming.DStream.cache">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">cache</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Persist the RDDs of this DStream with the default storage level</span>
<span class="sd"> (`MEMORY_ONLY`).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">is_cached</span> <span class="o">=</span> <span class="kc">True</span>
<span class="bp">self</span><span class="o">.</span><span class="n">persist</span><span class="p">(</span><span class="n">StorageLevel</span><span class="o">.</span><span class="n">MEMORY_ONLY</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DStream.persist"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.persist.html#pyspark.streaming.DStream.persist">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">persist</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span> <span class="n">storageLevel</span><span class="p">:</span> <span class="n">StorageLevel</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Persist the RDDs of this DStream with the given storage level</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">is_cached</span> <span class="o">=</span> <span class="kc">True</span>
<span class="n">javaStorageLevel</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_getJavaStorageLevel</span><span class="p">(</span><span class="n">storageLevel</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">persist</span><span class="p">(</span><span class="n">javaStorageLevel</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DStream.checkpoint"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.checkpoint.html#pyspark.streaming.DStream.checkpoint">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">checkpoint</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span> <span class="n">interval</span><span class="p">:</span> <span class="nb">int</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Enable periodic checkpointing of RDDs of this DStream</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> interval : int</span>
<span class="sd"> time in seconds, after each period of that, generated</span>
<span class="sd"> RDD will be checkpointed</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">is_checkpointed</span> <span class="o">=</span> <span class="kc">True</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">checkpoint</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span><span class="o">.</span><span class="n">_jduration</span><span class="p">(</span><span class="n">interval</span><span class="p">))</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DStream.groupByKey"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.groupByKey.html#pyspark.streaming.DStream.groupByKey">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">groupByKey</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, Iterable[V]]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying groupByKey on each RDD.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">numPartitions</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">numPartitions</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">defaultParallelism</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform</span><span class="p">(</span><span class="k">lambda</span> <span class="n">rdd</span><span class="p">:</span> <span class="n">rdd</span><span class="o">.</span><span class="n">groupByKey</span><span class="p">(</span><span class="n">numPartitions</span><span class="p">))</span></div>
<div class="viewcode-block" id="DStream.countByValue"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.countByValue.html#pyspark.streaming.DStream.countByValue">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">countByValue</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[K]&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, int]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream in which each RDD contains the counts of each</span>
<span class="sd"> distinct value in each RDD of this DStream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="mi">1</span><span class="p">))</span><span class="o">.</span><span class="n">reduceByKey</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">:</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.saveAsTextFiles"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.saveAsTextFiles.html#pyspark.streaming.DStream.saveAsTextFiles">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">saveAsTextFiles</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">prefix</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">suffix</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Save each RDD in this DStream as at text file, using string</span>
<span class="sd"> representation of elements.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span><span class="w"> </span><span class="nf">saveAsTextFile</span><span class="p">(</span><span class="n">t</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">datetime</span><span class="p">],</span> <span class="n">rdd</span><span class="p">:</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">path</span> <span class="o">=</span> <span class="n">rddToFileName</span><span class="p">(</span><span class="n">prefix</span><span class="p">,</span> <span class="n">suffix</span><span class="p">,</span> <span class="n">t</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">rdd</span><span class="o">.</span><span class="n">saveAsTextFile</span><span class="p">(</span><span class="n">path</span><span class="p">)</span>
<span class="k">except</span> <span class="n">Py4JJavaError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="c1"># after recovered from checkpointing, the foreachRDD may</span>
<span class="c1"># be called twice</span>
<span class="k">if</span> <span class="s2">&quot;FileAlreadyExistsException&quot;</span> <span class="ow">not</span> <span class="ow">in</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">):</span>
<span class="k">raise</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">foreachRDD</span><span class="p">(</span><span class="n">saveAsTextFile</span><span class="p">)</span></div>
<span class="c1"># TODO: uncomment this until we have ssc.pickleFileStream()</span>
<span class="c1"># def saveAsPickleFiles(self, prefix, suffix=None):</span>
<span class="c1"># &quot;&quot;&quot;</span>
<span class="c1"># Save each RDD in this DStream as at binary file, the elements are</span>
<span class="c1"># serialized by pickle.</span>
<span class="c1"># &quot;&quot;&quot;</span>
<span class="c1"># def saveAsPickleFile(t, rdd):</span>
<span class="c1"># path = rddToFileName(prefix, suffix, t)</span>
<span class="c1"># try:</span>
<span class="c1"># rdd.saveAsPickleFile(path)</span>
<span class="c1"># except Py4JJavaError as e:</span>
<span class="c1"># # after recovered from checkpointing, the foreachRDD may</span>
<span class="c1"># # be called twice</span>
<span class="c1"># if &#39;FileAlreadyExistsException&#39; not in str(e):</span>
<span class="c1"># raise</span>
<span class="c1"># return self.foreachRDD(saveAsPickleFile)</span>
<span class="nd">@overload</span>
<span class="k">def</span><span class="w"> </span><span class="nf">transform</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span> <span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]])</span> <span class="o">-&gt;</span> <span class="s2">&quot;TransformedDStream[U]&quot;</span><span class="p">:</span>
<span class="o">...</span>
<span class="nd">@overload</span>
<span class="k">def</span><span class="w"> </span><span class="nf">transform</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span> <span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">datetime</span><span class="p">,</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]]</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;TransformedDStream[U]&quot;</span><span class="p">:</span>
<span class="o">...</span>
<div class="viewcode-block" id="DStream.transform"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.transform.html#pyspark.streaming.DStream.transform">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">transform</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span>
<span class="n">func</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">Callable</span><span class="p">[[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]],</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">datetime</span><span class="p">,</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]]],</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;TransformedDStream[U]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream in which each RDD is generated by applying a function</span>
<span class="sd"> on each RDD of this DStream.</span>
<span class="sd"> `func` can have one argument of `rdd`, or have two arguments of</span>
<span class="sd"> (`time`, `rdd`)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">func</span><span class="o">.</span><span class="vm">__code__</span><span class="o">.</span><span class="n">co_argcount</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="n">oldfunc</span> <span class="o">=</span> <span class="n">func</span>
<span class="k">def</span><span class="w"> </span><span class="nf">func</span><span class="p">(</span><span class="n">_</span><span class="p">:</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">rdd</span><span class="p">:</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]:</span>
<span class="k">return</span> <span class="n">oldfunc</span><span class="p">(</span><span class="n">rdd</span><span class="p">)</span> <span class="c1"># type: ignore[arg-type, call-arg]</span>
<span class="k">assert</span> <span class="n">func</span><span class="o">.</span><span class="vm">__code__</span><span class="o">.</span><span class="n">co_argcount</span> <span class="o">==</span> <span class="mi">2</span><span class="p">,</span> <span class="s2">&quot;func should take one or two arguments&quot;</span>
<span class="k">return</span> <span class="n">TransformedDStream</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">func</span><span class="p">)</span></div>
<span class="nd">@overload</span>
<span class="k">def</span><span class="w"> </span><span class="nf">transformWith</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span>
<span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">V</span><span class="p">]],</span>
<span class="n">other</span><span class="p">:</span> <span class="s2">&quot;DStream[U]&quot;</span><span class="p">,</span>
<span class="n">keepSerializer</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="o">...</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[V]&quot;</span><span class="p">:</span>
<span class="o">...</span>
<span class="nd">@overload</span>
<span class="k">def</span><span class="w"> </span><span class="nf">transformWith</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span>
<span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">datetime</span><span class="p">,</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">V</span><span class="p">]],</span>
<span class="n">other</span><span class="p">:</span> <span class="s2">&quot;DStream[U]&quot;</span><span class="p">,</span>
<span class="n">keepSerializer</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="o">...</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[V]&quot;</span><span class="p">:</span>
<span class="o">...</span>
<div class="viewcode-block" id="DStream.transformWith"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.transformWith.html#pyspark.streaming.DStream.transformWith">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">transformWith</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span>
<span class="n">func</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span>
<span class="n">Callable</span><span class="p">[[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">V</span><span class="p">]],</span>
<span class="n">Callable</span><span class="p">[[</span><span class="n">datetime</span><span class="p">,</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">V</span><span class="p">]],</span>
<span class="p">],</span>
<span class="n">other</span><span class="p">:</span> <span class="s2">&quot;DStream[U]&quot;</span><span class="p">,</span>
<span class="n">keepSerializer</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[V]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream in which each RDD is generated by applying a function</span>
<span class="sd"> on each RDD of this DStream and &#39;other&#39; DStream.</span>
<span class="sd"> `func` can have two arguments of (`rdd_a`, `rdd_b`) or have three</span>
<span class="sd"> arguments of (`time`, `rdd_a`, `rdd_b`)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">func</span><span class="o">.</span><span class="vm">__code__</span><span class="o">.</span><span class="n">co_argcount</span> <span class="o">==</span> <span class="mi">2</span><span class="p">:</span>
<span class="n">oldfunc</span> <span class="o">=</span> <span class="n">func</span>
<span class="k">def</span><span class="w"> </span><span class="nf">func</span><span class="p">(</span><span class="n">_</span><span class="p">:</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">a</span><span class="p">:</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">],</span> <span class="n">b</span><span class="p">:</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">RDD</span><span class="p">[</span><span class="n">V</span><span class="p">]:</span>
<span class="k">return</span> <span class="n">oldfunc</span><span class="p">(</span><span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">)</span> <span class="c1"># type: ignore[call-arg, arg-type]</span>
<span class="k">assert</span> <span class="n">func</span><span class="o">.</span><span class="vm">__code__</span><span class="o">.</span><span class="n">co_argcount</span> <span class="o">==</span> <span class="mi">3</span><span class="p">,</span> <span class="s2">&quot;func should take two or three arguments&quot;</span>
<span class="n">jfunc</span> <span class="o">=</span> <span class="n">TransformFunction</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="p">,</span>
<span class="n">func</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">,</span>
<span class="n">other</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">dstream</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonTransformed2DStream</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">dstream</span><span class="p">(),</span> <span class="n">other</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">dstream</span><span class="p">(),</span> <span class="n">jfunc</span>
<span class="p">)</span>
<span class="n">jrdd_serializer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jrdd_deserializer</span> <span class="k">if</span> <span class="n">keepSerializer</span> <span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">serializer</span>
<span class="k">return</span> <span class="n">DStream</span><span class="p">(</span><span class="n">dstream</span><span class="o">.</span><span class="n">asJavaDStream</span><span class="p">(),</span> <span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span><span class="p">,</span> <span class="n">jrdd_serializer</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.repartition"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.repartition.html#pyspark.streaming.DStream.repartition">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">repartition</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">:</span> <span class="nb">int</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream with an increased or decreased level of parallelism.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transform</span><span class="p">(</span><span class="k">lambda</span> <span class="n">rdd</span><span class="p">:</span> <span class="n">rdd</span><span class="o">.</span><span class="n">repartition</span><span class="p">(</span><span class="n">numPartitions</span><span class="p">))</span></div>
<span class="nd">@property</span>
<span class="k">def</span><span class="w"> </span><span class="nf">_slideDuration</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return the slideDuration in seconds of this DStream</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">dstream</span><span class="p">()</span><span class="o">.</span><span class="n">slideDuration</span><span class="p">()</span><span class="o">.</span><span class="n">milliseconds</span><span class="p">()</span> <span class="o">/</span> <span class="mf">1000.0</span>
<div class="viewcode-block" id="DStream.union"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.union.html#pyspark.streaming.DStream.union">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">union</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span> <span class="n">other</span><span class="p">:</span> <span class="s2">&quot;DStream[U]&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Union[T, U]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by unifying data of another DStream with this DStream.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> other : :class:`DStream`</span>
<span class="sd"> Another DStream having the same interval (i.e., slideDuration)</span>
<span class="sd"> as this DStream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_slideDuration</span> <span class="o">!=</span> <span class="n">other</span><span class="o">.</span><span class="n">_slideDuration</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;the two DStream should have same slide duration&quot;</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transformWith</span><span class="p">(</span><span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span><span class="o">.</span><span class="n">union</span><span class="p">(</span><span class="n">b</span><span class="p">),</span> <span class="n">other</span><span class="p">,</span> <span class="kc">True</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.cogroup"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.cogroup.html#pyspark.streaming.DStream.cogroup">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">cogroup</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span>
<span class="n">other</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, U]]&quot;</span><span class="p">,</span>
<span class="n">numPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, Tuple[ResultIterable[V], ResultIterable[U]]]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying &#39;cogroup&#39; between RDDs of this</span>
<span class="sd"> DStream and `other` DStream.</span>
<span class="sd"> Hash partitioning is used to generate the RDDs with `numPartitions` partitions.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">numPartitions</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">numPartitions</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">defaultParallelism</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transformWith</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span><span class="o">.</span><span class="n">cogroup</span><span class="p">(</span><span class="n">b</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">),</span>
<span class="n">other</span><span class="p">,</span>
<span class="p">)</span></div>
<div class="viewcode-block" id="DStream.join"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.join.html#pyspark.streaming.DStream.join">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">join</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span>
<span class="n">other</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, U]]&quot;</span><span class="p">,</span>
<span class="n">numPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, Tuple[V, U]]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying &#39;join&#39; between RDDs of this DStream and</span>
<span class="sd"> `other` DStream.</span>
<span class="sd"> Hash partitioning is used to generate the RDDs with `numPartitions`</span>
<span class="sd"> partitions.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">numPartitions</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">numPartitions</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">defaultParallelism</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transformWith</span><span class="p">(</span><span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">b</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">),</span> <span class="n">other</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.leftOuterJoin"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.leftOuterJoin.html#pyspark.streaming.DStream.leftOuterJoin">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">leftOuterJoin</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span>
<span class="n">other</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, U]]&quot;</span><span class="p">,</span>
<span class="n">numPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, Tuple[V, Optional[U]]]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying &#39;left outer join&#39; between RDDs of this DStream and</span>
<span class="sd"> `other` DStream.</span>
<span class="sd"> Hash partitioning is used to generate the RDDs with `numPartitions`</span>
<span class="sd"> partitions.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">numPartitions</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">numPartitions</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">defaultParallelism</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transformWith</span><span class="p">(</span><span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span><span class="o">.</span><span class="n">leftOuterJoin</span><span class="p">(</span><span class="n">b</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">),</span> <span class="n">other</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.rightOuterJoin"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.rightOuterJoin.html#pyspark.streaming.DStream.rightOuterJoin">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">rightOuterJoin</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span>
<span class="n">other</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, U]]&quot;</span><span class="p">,</span>
<span class="n">numPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, Tuple[Optional[V], U]]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying &#39;right outer join&#39; between RDDs of this DStream and</span>
<span class="sd"> `other` DStream.</span>
<span class="sd"> Hash partitioning is used to generate the RDDs with `numPartitions`</span>
<span class="sd"> partitions.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">numPartitions</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">numPartitions</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">defaultParallelism</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transformWith</span><span class="p">(</span><span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span><span class="o">.</span><span class="n">rightOuterJoin</span><span class="p">(</span><span class="n">b</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">),</span> <span class="n">other</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.fullOuterJoin"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.fullOuterJoin.html#pyspark.streaming.DStream.fullOuterJoin">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">fullOuterJoin</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span>
<span class="n">other</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, U]]&quot;</span><span class="p">,</span>
<span class="n">numPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, Tuple[Optional[V], Optional[U]]]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying &#39;full outer join&#39; between RDDs of this DStream and</span>
<span class="sd"> `other` DStream.</span>
<span class="sd"> Hash partitioning is used to generate the RDDs with `numPartitions`</span>
<span class="sd"> partitions.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">numPartitions</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">numPartitions</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">defaultParallelism</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">transformWith</span><span class="p">(</span><span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span><span class="o">.</span><span class="n">fullOuterJoin</span><span class="p">(</span><span class="n">b</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">),</span> <span class="n">other</span><span class="p">)</span></div>
<span class="k">def</span><span class="w"> </span><span class="nf">_jtime</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">datetime</span><span class="p">,</span> <span class="nb">int</span><span class="p">,</span> <span class="nb">float</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">JavaObject</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Convert datetime or unix_timestamp into Time&quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">timestamp</span><span class="p">,</span> <span class="n">datetime</span><span class="p">):</span>
<span class="n">timestamp</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">mktime</span><span class="p">(</span><span class="n">timestamp</span><span class="o">.</span><span class="n">timetuple</span><span class="p">())</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">Time</span><span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">timestamp</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">))</span>
<div class="viewcode-block" id="DStream.slice"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.slice.html#pyspark.streaming.DStream.slice">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">slice</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">begin</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">datetime</span><span class="p">,</span> <span class="nb">int</span><span class="p">],</span> <span class="n">end</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">datetime</span><span class="p">,</span> <span class="nb">int</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">List</span><span class="p">[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return all the RDDs between &#39;begin&#39; to &#39;end&#39; (both included)</span>
<span class="sd"> `begin`, `end` could be datetime.datetime() or unix_timestamp</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">jrdds</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">slice</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jtime</span><span class="p">(</span><span class="n">begin</span><span class="p">),</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jtime</span><span class="p">(</span><span class="n">end</span><span class="p">))</span>
<span class="k">return</span> <span class="p">[</span><span class="n">RDD</span><span class="p">(</span><span class="n">jrdd</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">)</span> <span class="k">for</span> <span class="n">jrdd</span> <span class="ow">in</span> <span class="n">jrdds</span><span class="p">]</span></div>
<span class="k">def</span><span class="w"> </span><span class="nf">_validate_window_param</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">window</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> <span class="n">slide</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">duration</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">dstream</span><span class="p">()</span><span class="o">.</span><span class="n">slideDuration</span><span class="p">()</span><span class="o">.</span><span class="n">milliseconds</span><span class="p">()</span>
<span class="k">if</span> <span class="nb">int</span><span class="p">(</span><span class="n">window</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">)</span> <span class="o">%</span> <span class="n">duration</span> <span class="o">!=</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;windowDuration must be multiple of the parent &quot;</span>
<span class="s2">&quot;dstream&#39;s slide (batch) duration (</span><span class="si">%d</span><span class="s2"> ms)&quot;</span> <span class="o">%</span> <span class="n">duration</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">slide</span> <span class="ow">and</span> <span class="nb">int</span><span class="p">(</span><span class="n">slide</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">)</span> <span class="o">%</span> <span class="n">duration</span> <span class="o">!=</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;slideDuration must be multiple of the parent &quot;</span>
<span class="s2">&quot;dstream&#39;s slide (batch) duration (</span><span class="si">%d</span><span class="s2"> ms)&quot;</span> <span class="o">%</span> <span class="n">duration</span>
<span class="p">)</span>
<div class="viewcode-block" id="DStream.window"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.window.html#pyspark.streaming.DStream.window">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">window</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">windowDuration</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> <span class="n">slideDuration</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream in which each RDD contains all the elements in seen in a</span>
<span class="sd"> sliding window of time over this DStream.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> windowDuration : int</span>
<span class="sd"> width of the window; must be a multiple of this DStream&#39;s</span>
<span class="sd"> batching interval</span>
<span class="sd"> slideDuration : int, optional</span>
<span class="sd"> sliding interval of the window (i.e., the interval after which</span>
<span class="sd"> the new DStream will generate RDDs); must be a multiple of this</span>
<span class="sd"> DStream&#39;s batching interval</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_validate_window_param</span><span class="p">(</span><span class="n">windowDuration</span><span class="p">,</span> <span class="n">slideDuration</span><span class="p">)</span>
<span class="n">d</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span><span class="o">.</span><span class="n">_jduration</span><span class="p">(</span><span class="n">windowDuration</span><span class="p">)</span>
<span class="k">if</span> <span class="n">slideDuration</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">DStream</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">window</span><span class="p">(</span><span class="n">d</span><span class="p">),</span> <span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">)</span>
<span class="n">s</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span><span class="o">.</span><span class="n">_jduration</span><span class="p">(</span><span class="n">slideDuration</span><span class="p">)</span>
<span class="k">return</span> <span class="n">DStream</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">window</span><span class="p">(</span><span class="n">d</span><span class="p">,</span> <span class="n">s</span><span class="p">),</span> <span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.reduceByWindow"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.reduceByWindow.html#pyspark.streaming.DStream.reduceByWindow">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">reduceByWindow</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span>
<span class="n">reduceFunc</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">T</span><span class="p">,</span> <span class="n">T</span><span class="p">],</span> <span class="n">T</span><span class="p">],</span>
<span class="n">invReduceFunc</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Callable</span><span class="p">[[</span><span class="n">T</span><span class="p">,</span> <span class="n">T</span><span class="p">],</span> <span class="n">T</span><span class="p">]],</span>
<span class="n">windowDuration</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">slideDuration</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream in which each RDD has a single element generated by reducing all</span>
<span class="sd"> elements in a sliding window over this DStream.</span>
<span class="sd"> if `invReduceFunc` is not None, the reduction is done incrementally</span>
<span class="sd"> using the old window&#39;s reduced value :</span>
<span class="sd"> 1. reduce the new values that entered the window (e.g., adding new counts)</span>
<span class="sd"> 2. &quot;inverse reduce&quot; the old values that left the window (e.g., subtracting old counts)</span>
<span class="sd"> This is more efficient than `invReduceFunc` is None.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> reduceFunc : function</span>
<span class="sd"> associative and commutative reduce function</span>
<span class="sd"> invReduceFunc : function</span>
<span class="sd"> inverse reduce function of `reduceFunc`; such that for all y,</span>
<span class="sd"> and invertible x:</span>
<span class="sd"> `invReduceFunc(reduceFunc(x, y), x) = y`</span>
<span class="sd"> windowDuration : int</span>
<span class="sd"> width of the window; must be a multiple of this DStream&#39;s</span>
<span class="sd"> batching interval</span>
<span class="sd"> slideDuration : int</span>
<span class="sd"> sliding interval of the window (i.e., the interval after which</span>
<span class="sd"> the new DStream will generate RDDs); must be a multiple of this</span>
<span class="sd"> DStream&#39;s batching interval</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">keyed</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="n">x</span><span class="p">))</span>
<span class="n">reduced</span> <span class="o">=</span> <span class="n">keyed</span><span class="o">.</span><span class="n">reduceByKeyAndWindow</span><span class="p">(</span>
<span class="n">reduceFunc</span><span class="p">,</span> <span class="n">invReduceFunc</span><span class="p">,</span> <span class="n">windowDuration</span><span class="p">,</span> <span class="n">slideDuration</span><span class="p">,</span> <span class="mi">1</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">reduced</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">kv</span><span class="p">:</span> <span class="n">kv</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span></div>
<div class="viewcode-block" id="DStream.countByWindow"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.countByWindow.html#pyspark.streaming.DStream.countByWindow">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">countByWindow</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span> <span class="n">windowDuration</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span> <span class="n">slideDuration</span><span class="p">:</span> <span class="nb">int</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[int]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream in which each RDD has a single element generated</span>
<span class="sd"> by counting the number of elements in a window over this DStream.</span>
<span class="sd"> windowDuration and slideDuration are as defined in the window() operation.</span>
<span class="sd"> This is equivalent to window(windowDuration, slideDuration).count(),</span>
<span class="sd"> but will be more efficient if window is large.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="mi">1</span><span class="p">)</span><span class="o">.</span><span class="n">reduceByWindow</span><span class="p">(</span>
<span class="n">operator</span><span class="o">.</span><span class="n">add</span><span class="p">,</span> <span class="n">operator</span><span class="o">.</span><span class="n">sub</span><span class="p">,</span> <span class="n">windowDuration</span><span class="p">,</span> <span class="n">slideDuration</span>
<span class="p">)</span></div>
<div class="viewcode-block" id="DStream.countByValueAndWindow"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.countByValueAndWindow.html#pyspark.streaming.DStream.countByValueAndWindow">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">countByValueAndWindow</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[T]&quot;</span><span class="p">,</span>
<span class="n">windowDuration</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">slideDuration</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">numPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[T, int]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream in which each RDD contains the count of distinct elements in</span>
<span class="sd"> RDDs in a sliding window over this DStream.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> windowDuration : int</span>
<span class="sd"> width of the window; must be a multiple of this DStream&#39;s</span>
<span class="sd"> batching interval</span>
<span class="sd"> slideDuration : int</span>
<span class="sd"> sliding interval of the window (i.e., the interval after which</span>
<span class="sd"> the new DStream will generate RDDs); must be a multiple of this</span>
<span class="sd"> DStream&#39;s batching interval</span>
<span class="sd"> numPartitions : int, optional</span>
<span class="sd"> number of partitions of each RDD in the new DStream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">keyed</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="mi">1</span><span class="p">))</span>
<span class="n">counted</span> <span class="o">=</span> <span class="n">keyed</span><span class="o">.</span><span class="n">reduceByKeyAndWindow</span><span class="p">(</span>
<span class="n">operator</span><span class="o">.</span><span class="n">add</span><span class="p">,</span> <span class="n">operator</span><span class="o">.</span><span class="n">sub</span><span class="p">,</span> <span class="n">windowDuration</span><span class="p">,</span> <span class="n">slideDuration</span><span class="p">,</span> <span class="n">numPartitions</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">counted</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="k">lambda</span> <span class="n">kv</span><span class="p">:</span> <span class="n">kv</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.groupByKeyAndWindow"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.groupByKeyAndWindow.html#pyspark.streaming.DStream.groupByKeyAndWindow">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">groupByKeyAndWindow</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span>
<span class="n">windowDuration</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">slideDuration</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">numPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, Iterable[V]]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying `groupByKey` over a sliding window.</span>
<span class="sd"> Similar to `DStream.groupByKey()`, but applies it over a sliding window.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> windowDuration : int</span>
<span class="sd"> width of the window; must be a multiple of this DStream&#39;s</span>
<span class="sd"> batching interval</span>
<span class="sd"> slideDuration : int</span>
<span class="sd"> sliding interval of the window (i.e., the interval after which</span>
<span class="sd"> the new DStream will generate RDDs); must be a multiple of this</span>
<span class="sd"> DStream&#39;s batching interval</span>
<span class="sd"> numPartitions : int, optional</span>
<span class="sd"> Number of partitions of each RDD in the new DStream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">ls</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">mapValues</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">[</span><span class="n">x</span><span class="p">])</span>
<span class="n">grouped</span> <span class="o">=</span> <span class="n">ls</span><span class="o">.</span><span class="n">reduceByKeyAndWindow</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">b</span><span class="p">)</span> <span class="ow">or</span> <span class="n">a</span><span class="p">,</span> <span class="c1"># type: ignore[func-returns-value]</span>
<span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span><span class="p">[</span><span class="nb">len</span><span class="p">(</span><span class="n">b</span><span class="p">)</span> <span class="p">:],</span>
<span class="n">windowDuration</span><span class="p">,</span>
<span class="n">slideDuration</span><span class="p">,</span>
<span class="n">numPartitions</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">grouped</span><span class="o">.</span><span class="n">mapValues</span><span class="p">(</span><span class="n">ResultIterable</span><span class="p">)</span></div>
<div class="viewcode-block" id="DStream.reduceByKeyAndWindow"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.reduceByKeyAndWindow.html#pyspark.streaming.DStream.reduceByKeyAndWindow">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">reduceByKeyAndWindow</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span>
<span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">V</span><span class="p">,</span> <span class="n">V</span><span class="p">],</span> <span class="n">V</span><span class="p">],</span>
<span class="n">invFunc</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Callable</span><span class="p">[[</span><span class="n">V</span><span class="p">,</span> <span class="n">V</span><span class="p">],</span> <span class="n">V</span><span class="p">]],</span>
<span class="n">windowDuration</span><span class="p">:</span> <span class="nb">int</span><span class="p">,</span>
<span class="n">slideDuration</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">numPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">filterFunc</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Callable</span><span class="p">[[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">V</span><span class="p">]],</span> <span class="nb">bool</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new DStream by applying incremental `reduceByKey` over a sliding window.</span>
<span class="sd"> The reduced value of over a new window is calculated using the old window&#39;s reduce value :</span>
<span class="sd"> 1. reduce the new values that entered the window (e.g., adding new counts)</span>
<span class="sd"> 2. &quot;inverse reduce&quot; the old values that left the window (e.g., subtracting old counts)</span>
<span class="sd"> `invFunc` can be None, then it will reduce all the RDDs in window, could be slower</span>
<span class="sd"> than having `invFunc`.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> func : function</span>
<span class="sd"> associative and commutative reduce function</span>
<span class="sd"> invFunc : function</span>
<span class="sd"> inverse function of `reduceFunc`</span>
<span class="sd"> windowDuration : int</span>
<span class="sd"> width of the window; must be a multiple of this DStream&#39;s</span>
<span class="sd"> batching interval</span>
<span class="sd"> slideDuration : int, optional</span>
<span class="sd"> sliding interval of the window (i.e., the interval after which</span>
<span class="sd"> the new DStream will generate RDDs); must be a multiple of this</span>
<span class="sd"> DStream&#39;s batching interval</span>
<span class="sd"> numPartitions : int, optional</span>
<span class="sd"> number of partitions of each RDD in the new DStream.</span>
<span class="sd"> filterFunc : function, optional</span>
<span class="sd"> function to filter expired key-value pairs;</span>
<span class="sd"> only pairs that satisfy the function are retained</span>
<span class="sd"> set this to null if you do not want to filter</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_validate_window_param</span><span class="p">(</span><span class="n">windowDuration</span><span class="p">,</span> <span class="n">slideDuration</span><span class="p">)</span>
<span class="k">if</span> <span class="n">numPartitions</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">numPartitions</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">defaultParallelism</span>
<span class="n">reduced</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">reduceByKey</span><span class="p">(</span><span class="n">func</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">)</span>
<span class="k">if</span> <span class="n">invFunc</span><span class="p">:</span>
<span class="k">def</span><span class="w"> </span><span class="nf">reduceFunc</span><span class="p">(</span><span class="n">t</span><span class="p">:</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">a</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">Any</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Any</span><span class="p">:</span>
<span class="n">b</span> <span class="o">=</span> <span class="n">b</span><span class="o">.</span><span class="n">reduceByKey</span><span class="p">(</span><span class="n">func</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">)</span>
<span class="n">r</span> <span class="o">=</span> <span class="n">a</span><span class="o">.</span><span class="n">union</span><span class="p">(</span><span class="n">b</span><span class="p">)</span><span class="o">.</span><span class="n">reduceByKey</span><span class="p">(</span><span class="n">func</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">)</span> <span class="k">if</span> <span class="n">a</span> <span class="k">else</span> <span class="n">b</span>
<span class="k">if</span> <span class="n">filterFunc</span><span class="p">:</span>
<span class="n">r</span> <span class="o">=</span> <span class="n">r</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">filterFunc</span><span class="p">)</span>
<span class="k">return</span> <span class="n">r</span>
<span class="k">def</span><span class="w"> </span><span class="nf">invReduceFunc</span><span class="p">(</span><span class="n">t</span><span class="p">:</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">a</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">Any</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Any</span><span class="p">:</span>
<span class="n">b</span> <span class="o">=</span> <span class="n">b</span><span class="o">.</span><span class="n">reduceByKey</span><span class="p">(</span><span class="n">func</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">)</span>
<span class="n">joined</span> <span class="o">=</span> <span class="n">a</span><span class="o">.</span><span class="n">leftOuterJoin</span><span class="p">(</span><span class="n">b</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">)</span>
<span class="k">return</span> <span class="n">joined</span><span class="o">.</span><span class="n">mapValues</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">kv</span><span class="p">:</span> <span class="n">invFunc</span><span class="p">(</span><span class="n">kv</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">kv</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span> <span class="c1"># type: ignore[misc]</span>
<span class="k">if</span> <span class="n">kv</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="k">else</span> <span class="n">kv</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="p">)</span>
<span class="n">jreduceFunc</span> <span class="o">=</span> <span class="n">TransformFunction</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="p">,</span> <span class="n">reduceFunc</span><span class="p">,</span> <span class="n">reduced</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">)</span>
<span class="n">jinvReduceFunc</span> <span class="o">=</span> <span class="n">TransformFunction</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="p">,</span> <span class="n">invReduceFunc</span><span class="p">,</span> <span class="n">reduced</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">)</span>
<span class="k">if</span> <span class="n">slideDuration</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">slideDuration</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_slideDuration</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">dstream</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonReducedWindowedDStream</span><span class="p">(</span>
<span class="n">reduced</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">dstream</span><span class="p">(),</span>
<span class="n">jreduceFunc</span><span class="p">,</span>
<span class="n">jinvReduceFunc</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span><span class="o">.</span><span class="n">_jduration</span><span class="p">(</span><span class="n">windowDuration</span><span class="p">),</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span><span class="o">.</span><span class="n">_jduration</span><span class="p">(</span><span class="n">slideDuration</span><span class="p">),</span> <span class="c1"># type: ignore[arg-type]</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">DStream</span><span class="p">(</span><span class="n">dstream</span><span class="o">.</span><span class="n">asJavaDStream</span><span class="p">(),</span> <span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">serializer</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">reduced</span><span class="o">.</span><span class="n">window</span><span class="p">(</span><span class="n">windowDuration</span><span class="p">,</span> <span class="n">slideDuration</span><span class="p">)</span><span class="o">.</span><span class="n">reduceByKey</span><span class="p">(</span>
<span class="n">func</span><span class="p">,</span> <span class="n">numPartitions</span> <span class="c1"># type: ignore[arg-type]</span>
<span class="p">)</span></div>
<div class="viewcode-block" id="DStream.updateStateByKey"><a class="viewcode-back" href="../../../reference/api/pyspark.streaming.DStream.updateStateByKey.html#pyspark.streaming.DStream.updateStateByKey">[docs]</a> <span class="k">def</span><span class="w"> </span><span class="nf">updateStateByKey</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="s2">&quot;DStream[Tuple[K, V]]&quot;</span><span class="p">,</span>
<span class="n">updateFunc</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">Iterable</span><span class="p">[</span><span class="n">V</span><span class="p">],</span> <span class="n">Optional</span><span class="p">[</span><span class="n">S</span><span class="p">]],</span> <span class="n">S</span><span class="p">],</span>
<span class="n">numPartitions</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">initialRDD</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="n">RDD</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">S</span><span class="p">]],</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">S</span><span class="p">]]]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DStream[Tuple[K, S]]&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return a new &quot;state&quot; DStream where the state for each key is updated by applying</span>
<span class="sd"> the given function on the previous state of the key and the new values of the key.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> updateFunc : function</span>
<span class="sd"> State update function. If this function returns None, then</span>
<span class="sd"> corresponding state key-value pair will be eliminated.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">numPartitions</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">numPartitions</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">defaultParallelism</span>
<span class="k">if</span> <span class="n">initialRDD</span> <span class="ow">and</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">initialRDD</span><span class="p">,</span> <span class="n">RDD</span><span class="p">):</span>
<span class="n">initialRDD</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="n">initialRDD</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="nf">reduceFunc</span><span class="p">(</span><span class="n">t</span><span class="p">:</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">a</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">Any</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Any</span><span class="p">:</span>
<span class="k">if</span> <span class="n">a</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">g</span> <span class="o">=</span> <span class="n">b</span><span class="o">.</span><span class="n">groupByKey</span><span class="p">(</span><span class="n">numPartitions</span><span class="p">)</span><span class="o">.</span><span class="n">mapValues</span><span class="p">(</span><span class="k">lambda</span> <span class="n">vs</span><span class="p">:</span> <span class="p">(</span><span class="nb">list</span><span class="p">(</span><span class="n">vs</span><span class="p">),</span> <span class="kc">None</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">g</span> <span class="o">=</span> <span class="n">a</span><span class="o">.</span><span class="n">cogroup</span><span class="p">(</span><span class="n">b</span><span class="o">.</span><span class="n">partitionBy</span><span class="p">(</span><span class="n">cast</span><span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="n">numPartitions</span><span class="p">)),</span> <span class="n">numPartitions</span><span class="p">)</span>
<span class="n">g</span> <span class="o">=</span> <span class="n">g</span><span class="o">.</span><span class="n">mapValues</span><span class="p">(</span><span class="k">lambda</span> <span class="n">ab</span><span class="p">:</span> <span class="p">(</span><span class="nb">list</span><span class="p">(</span><span class="n">ab</span><span class="p">[</span><span class="mi">1</span><span class="p">]),</span> <span class="nb">list</span><span class="p">(</span><span class="n">ab</span><span class="p">[</span><span class="mi">0</span><span class="p">])[</span><span class="mi">0</span><span class="p">]</span> <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">ab</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> <span class="k">else</span> <span class="kc">None</span><span class="p">))</span>
<span class="n">state</span> <span class="o">=</span> <span class="n">g</span><span class="o">.</span><span class="n">mapValues</span><span class="p">(</span><span class="k">lambda</span> <span class="n">vs_s</span><span class="p">:</span> <span class="n">updateFunc</span><span class="p">(</span><span class="n">vs_s</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">vs_s</span><span class="p">[</span><span class="mi">1</span><span class="p">]))</span>
<span class="k">return</span> <span class="n">state</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="k">lambda</span> <span class="n">k_v</span><span class="p">:</span> <span class="n">k_v</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">)</span>
<span class="n">jreduceFunc</span> <span class="o">=</span> <span class="n">TransformFunction</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="p">,</span>
<span class="n">reduceFunc</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">serializer</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">initialRDD</span><span class="p">:</span>
<span class="n">initialRDD</span> <span class="o">=</span> <span class="n">cast</span><span class="p">(</span><span class="n">RDD</span><span class="p">[</span><span class="n">Tuple</span><span class="p">[</span><span class="n">K</span><span class="p">,</span> <span class="n">S</span><span class="p">]],</span> <span class="n">initialRDD</span><span class="p">)</span><span class="o">.</span><span class="n">_reserialize</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">)</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">dstream</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonStateDStream</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">dstream</span><span class="p">(),</span>
<span class="n">jreduceFunc</span><span class="p">,</span>
<span class="n">initialRDD</span><span class="o">.</span><span class="n">_jrdd</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">dstream</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonStateDStream</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">dstream</span><span class="p">(),</span> <span class="n">jreduceFunc</span><span class="p">)</span>
<span class="k">return</span> <span class="n">DStream</span><span class="p">(</span><span class="n">dstream</span><span class="o">.</span><span class="n">asJavaDStream</span><span class="p">(),</span> <span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">serializer</span><span class="p">)</span></div></div>
<span class="k">class</span><span class="w"> </span><span class="nc">TransformedDStream</span><span class="p">(</span><span class="n">DStream</span><span class="p">[</span><span class="n">U</span><span class="p">]):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> TransformedDStream is a DStream generated by an Python function</span>
<span class="sd"> transforming each RDD of a DStream to another RDDs.</span>
<span class="sd"> Multiple continuous transformations of DStream can be combined into</span>
<span class="sd"> one transformation.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nd">@overload</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">:</span> <span class="n">DStream</span><span class="p">[</span><span class="n">U</span><span class="p">],</span> <span class="n">prev</span><span class="p">:</span> <span class="n">DStream</span><span class="p">[</span><span class="n">T</span><span class="p">],</span> <span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]]):</span>
<span class="o">...</span>
<span class="nd">@overload</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">:</span> <span class="n">DStream</span><span class="p">[</span><span class="n">U</span><span class="p">],</span>
<span class="n">prev</span><span class="p">:</span> <span class="n">DStream</span><span class="p">[</span><span class="n">T</span><span class="p">],</span>
<span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">datetime</span><span class="p">,</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]],</span>
<span class="p">):</span>
<span class="o">...</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">prev</span><span class="p">:</span> <span class="n">DStream</span><span class="p">[</span><span class="n">T</span><span class="p">],</span>
<span class="n">func</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">Callable</span><span class="p">[[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]],</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">datetime</span><span class="p">,</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]]],</span>
<span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span> <span class="o">=</span> <span class="n">prev</span><span class="o">.</span><span class="n">_ssc</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_sc</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_ssc</span><span class="o">.</span><span class="n">_sc</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jrdd_deserializer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">serializer</span>
<span class="bp">self</span><span class="o">.</span><span class="n">is_cached</span> <span class="o">=</span> <span class="kc">False</span>
<span class="bp">self</span><span class="o">.</span><span class="n">is_checkpointed</span> <span class="o">=</span> <span class="kc">False</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jdstream_val</span> <span class="o">=</span> <span class="kc">None</span>
<span class="c1"># Using type() to avoid folding the functions and compacting the DStreams which is not</span>
<span class="c1"># not strictly an object of TransformedDStream.</span>
<span class="k">if</span> <span class="nb">type</span><span class="p">(</span><span class="n">prev</span><span class="p">)</span> <span class="ow">is</span> <span class="n">TransformedDStream</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">prev</span><span class="o">.</span><span class="n">is_cached</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">prev</span><span class="o">.</span><span class="n">is_checkpointed</span><span class="p">:</span>
<span class="n">prev_func</span><span class="p">:</span> <span class="n">Callable</span> <span class="o">=</span> <span class="n">prev</span><span class="o">.</span><span class="n">func</span>
<span class="n">func</span> <span class="o">=</span> <span class="n">cast</span><span class="p">(</span><span class="n">Callable</span><span class="p">[[</span><span class="n">datetime</span><span class="p">,</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]],</span> <span class="n">func</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">func</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span>
<span class="n">Callable</span><span class="p">[[</span><span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]],</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">datetime</span><span class="p">,</span> <span class="n">RDD</span><span class="p">[</span><span class="n">T</span><span class="p">]],</span> <span class="n">RDD</span><span class="p">[</span><span class="n">U</span><span class="p">]]</span>
<span class="p">]</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">t</span><span class="p">,</span> <span class="n">rdd</span><span class="p">:</span> <span class="n">func</span><span class="p">(</span><span class="n">t</span><span class="p">,</span> <span class="n">prev_func</span><span class="p">(</span><span class="n">t</span><span class="p">,</span> <span class="n">rdd</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">prev</span><span class="p">:</span> <span class="n">DStream</span><span class="p">[</span><span class="n">T</span><span class="p">]</span> <span class="o">=</span> <span class="n">prev</span><span class="o">.</span><span class="n">prev</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">prev</span> <span class="o">=</span> <span class="n">prev</span>
<span class="bp">self</span><span class="o">.</span><span class="n">func</span> <span class="o">=</span> <span class="n">func</span>
<span class="nd">@property</span>
<span class="k">def</span><span class="w"> </span><span class="nf">_jdstream</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">JavaObject</span><span class="p">:</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jdstream_val</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jdstream_val</span>
<span class="n">jfunc</span> <span class="o">=</span> <span class="n">TransformFunction</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">func</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">prev</span><span class="o">.</span><span class="n">_jrdd_deserializer</span><span class="p">)</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">dstream</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">PythonTransformedDStream</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">prev</span><span class="o">.</span><span class="n">_jdstream</span><span class="o">.</span><span class="n">dstream</span><span class="p">(),</span> <span class="n">jfunc</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jdstream_val</span> <span class="o">=</span> <span class="n">dstream</span><span class="o">.</span><span class="n">asJavaDStream</span><span class="p">()</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jdstream_val</span>
</pre></div>
</div>
<!-- Previous / next buttons -->
<div class='prev-next-area'>
</div>
</main>
</div>
</div>
<script src="../../../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf"></script>
<footer class="footer mt-5 mt-md-0">
<div class="container">
<div class="footer-item">
<p class="copyright">
&copy; Copyright .<br>
</p>
</div>
<div class="footer-item">
<p class="sphinx-version">
Created using <a href="http://sphinx-doc.org/">Sphinx</a> 3.0.4.<br>
</p>
</div>
</div>
</footer>
</body>
</html>