blob: c73db8b067f3dea1ae70e9f928c89b0499e3f77d [file] [log] [blame]
<!DOCTYPE html>
<html class="no-js">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Structured Streaming Programming Guide - Spark 4.1.0-preview1 Documentation</title>
<link rel="stylesheet" href="../css/bootstrap.min.css">
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=DM+Sans:ital,wght@0,400;0,500;0,700;1,400;1,500;1,700&Courier+Prime:wght@400;700&display=swap" rel="stylesheet">
<link href="../css/custom.css" rel="stylesheet">
<script src="../js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
<link rel="stylesheet" href="../css/pygments-default.css">
<link rel="stylesheet" href="../css/docsearch.min.css" />
<link rel="stylesheet" href="../css/docsearch.css">
<!-- Matomo -->
<script>
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body class="global">
<!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
<nav class="navbar navbar-expand-lg navbar-dark p-0 px-4 fixed-top" style="background: #1d6890;" id="topbar">
<div class="navbar-brand"><a href="../index.html">
<img src="https://spark.apache.org/images/spark-logo-rev.svg" width="141" height="72"/></a><span class="version">4.1.0-preview1</span>
</div>
<button class="navbar-toggler" type="button" data-toggle="collapse"
data-target="#navbarCollapse" aria-controls="navbarCollapse"
aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarCollapse">
<ul class="navbar-nav me-auto">
<li class="nav-item"><a href="../index.html" class="nav-link">Overview</a></li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarQuickStart" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Programming Guides</a>
<div class="dropdown-menu" aria-labelledby="navbarQuickStart">
<a class="dropdown-item" href="../quick-start.html">Quick Start</a>
<a class="dropdown-item" href="../rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a>
<a class="dropdown-item" href="../sql-programming-guide.html">SQL, DataFrames, and Datasets</a>
<a class="dropdown-item" href="../streaming/index.html">Structured Streaming</a>
<a class="dropdown-item" href="../streaming-programming-guide.html">Spark Streaming (DStreams)</a>
<a class="dropdown-item" href="../ml-guide.html">MLlib (Machine Learning)</a>
<a class="dropdown-item" href="../graphx-programming-guide.html">GraphX (Graph Processing)</a>
<a class="dropdown-item" href="../sparkr.html">SparkR (R on Spark)</a>
<a class="dropdown-item" href="../api/python/getting_started/index.html">PySpark (Python on Spark)</a>
<a class="dropdown-item" href="../declarative-pipelines-programming-guide.html">Declarative Pipelines</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarAPIDocs" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">API Docs</a>
<div class="dropdown-menu" aria-labelledby="navbarAPIDocs">
<a class="dropdown-item" href="../api/python/index.html">Python</a>
<a class="dropdown-item" href="../api/scala/org/apache/spark/index.html">Scala</a>
<a class="dropdown-item" href="../api/java/index.html">Java</a>
<a class="dropdown-item" href="../api/R/index.html">R</a>
<a class="dropdown-item" href="../api/sql/index.html">SQL, Built-in Functions</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarDeploying" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Deploying</a>
<div class="dropdown-menu" aria-labelledby="navbarDeploying">
<a class="dropdown-item" href="../cluster-overview.html">Overview</a>
<a class="dropdown-item" href="../submitting-applications.html">Submitting Applications</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="../spark-standalone.html">Spark Standalone</a>
<a class="dropdown-item" href="../running-on-yarn.html">YARN</a>
<a class="dropdown-item" href="../running-on-kubernetes.html">Kubernetes</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarMore" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">More</a>
<div class="dropdown-menu" aria-labelledby="navbarMore">
<a class="dropdown-item" href="../configuration.html">Configuration</a>
<a class="dropdown-item" href="../monitoring.html">Monitoring</a>
<a class="dropdown-item" href="../tuning.html">Tuning Guide</a>
<a class="dropdown-item" href="../job-scheduling.html">Job Scheduling</a>
<a class="dropdown-item" href="../security.html">Security</a>
<a class="dropdown-item" href="../hardware-provisioning.html">Hardware Provisioning</a>
<a class="dropdown-item" href="../migration-guide.html">Migration Guide</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="../building-spark.html">Building Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/contributing.html">Contributing to Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a>
</div>
</li>
<li class="nav-item">
<input type="text" id="docsearch-input" placeholder="Search the docs…">
</li>
</ul>
<!--<span class="navbar-text navbar-right"><span class="version-text">v4.1.0-preview1</span></span>-->
</div>
</nav>
<div class="container">
<div class="left-menu-wrapper">
<div class="left-menu">
<h3><a href="../streaming/index.html">Structured Streaming Programming Guide</a></h3>
<ul>
<li>
<a href="../streaming/index.html">
Overview
</a>
</li>
<li>
<a href="../streaming/getting-started.html">
Getting Started
</a>
</li>
<li>
<a href="../streaming/apis-on-dataframes-and-datasets.html">
APIs on DataFrames and Datasets
</a>
</li>
<li>
<a href="../streaming/performance-tips.html">
Performance Tips
</a>
</li>
<li>
<a href="../streaming/additional-information.html">
Additional Information
</a>
</li>
</ul>
</div>
</div>
<input id="nav-trigger" class="nav-trigger" checked type="checkbox">
<label for="nav-trigger"></label>
<div class="content-with-sidebar mr-3" id="content">
<h1 class="title">Structured Streaming Programming Guide</h1>
<h1 id="overview">Overview</h1>
<p>TransformWithState is the new arbitrary stateful operator in Structured Streaming since the Apache Spark 4.0 release. This operator is the next generation replacement for the old mapGroupsWithState/flatMapGroupsWithState API in Scala and the applyInPandasWithState API in Python for arbitrary stateful processing in Apache Spark.</p>
<p>This operator has support for an umbrella of features such as object-oriented stateful processor definition, composite types, automatic TTL based eviction, timers etc and can be used to build business-critical operational use-cases.</p>
<h1 id="language-support">Language Support</h1>
<p><code class="language-plaintext highlighter-rouge">TransformWithState</code> is available in Scala, Java and Python.</p>
<p>Note that in Python, there are two operators named <code class="language-plaintext highlighter-rouge">transformWithStateInPandas</code> which works with Pandas interface, and <code class="language-plaintext highlighter-rouge">transformWithState</code> which works with Row interface.</p>
<p>Based on popularity of Pandas and its rich set of API with vectorization, <code class="language-plaintext highlighter-rouge">transformWithStateInPandas</code> may be the preferred API for most users. The <code class="language-plaintext highlighter-rouge">transformWithState</code> API is more suitable to handle high key cardinality use case, since the cost of conversion is considerably high for Pandas API. If users aren&#8217;t familiar with Pandas, Row type API might be easier to learn.</p>
<h1 id="components-of-a-transformwithstate-query">Components of a TransformWithState Query</h1>
<p>A transformWithState query typically consists of the following components:</p>
<ul>
<li>Stateful Processor - A user-defined stateful processor that defines the stateful logic</li>
<li>Output Mode - Output mode for the query such as Append, Update etc</li>
<li>Time Mode - Time mode for the query such as EventTime, ProcessingTime etc</li>
<li>Initial State - An optional initial state batch dataframe used to pre-populate the state</li>
</ul>
<p>In the following sections, we will go through the above components in more detail.</p>
<h2 id="defining-a-stateful-processor">Defining a Stateful Processor</h2>
<p>A stateful processor is the core of the user-defined logic used to operate on the input events. A stateful processor is defined by extending the StatefulProcessor class and implementing a few methods.</p>
<p>A typical stateful processor deals with the following constructs:</p>
<ul>
<li>Input Records - Input records received by the stream</li>
<li>State Variables - Zero or more class specific members used to store user state</li>
<li>Output Records - Output records produced by the processor. Zero or more output records may be produced by the processor.</li>
</ul>
<p>A stateful processor uses the object-oriented paradigm to define the stateful logic. The stateful logic is defined by implementing the following methods:</p>
<ul>
<li><code class="language-plaintext highlighter-rouge">init</code> - Initialize the stateful processor and define any state variables as needed</li>
<li><code class="language-plaintext highlighter-rouge">handleInputRows</code> - Process input rows belonging to a grouping key and emit output if needed</li>
<li><code class="language-plaintext highlighter-rouge">handleExpiredTimer</code> - Handle expired timers and emit output if needed</li>
<li><code class="language-plaintext highlighter-rouge">close</code> - Perform any cleanup operations if needed</li>
<li><code class="language-plaintext highlighter-rouge">handleInitialState</code> - Optionally handle the initial state batch dataframe</li>
</ul>
<p>The methods above will be invoked by the Spark query engine when the operator is executed as part of a streaming query.</p>
<p>Note also that not all types of operations are supported in each of the methods. For eg, users cannot register timers in the <code class="language-plaintext highlighter-rouge">init</code> method. Similarly, they cannot operate on input rows in the <code class="language-plaintext highlighter-rouge">handleExpiredTimer</code> method. The engine will detect unsupported/incompatible operations and fail the query, if needed.</p>
<h3 id="using-the-statefulprocessorhandle">Using the StatefulProcessorHandle</h3>
<p>Many operations within the methods above can be performed using the <code class="language-plaintext highlighter-rouge">StatefulProcessorHandle</code> object. The <code class="language-plaintext highlighter-rouge">StatefulProcessorHandle</code> object provides methods to interact with the underlying state store. This object can be retrieved within the StatefulProcessor by invoking the <code class="language-plaintext highlighter-rouge">getHandle</code> method.</p>
<h3 id="using-state-variables">Using State Variables</h3>
<p>State variables are class specific members used to store user state. They need to be declared once and initialized within the <code class="language-plaintext highlighter-rouge">init</code> method of the stateful processor.</p>
<p>Initializing a state variable typically involves the following steps:</p>
<ul>
<li>Provide a unique name for the state variable (unique within the stateful processor definition)</li>
<li>Provide a type for the state variable (ValueState, ListState, MapState) - depending on the type, the appropriate method on the handle needs to be invoked</li>
<li>Provide a state encoder for the state variable (in Scala - this can be skipped if implicit encoders are available)</li>
<li>Provide an optional TTL config for the state variable</li>
</ul>
<h3 id="types-of-state-variables">Types of state variables</h3>
<p>State variables can be of the following types:</p>
<ul>
<li>Value State</li>
<li>List State</li>
<li>Map State</li>
</ul>
<p>Similar to collections for popular programming languages, the state types could be used to model data structures optimized for various types of operations for the underlying storage layer. For example, appends are optimized for ListState and point lookups are optimized for MapState.</p>
<h3 id="providing-state-encoders">Providing state encoders</h3>
<p>State encoders are used to serialize and deserialize the state variables. In Scala, the state encoders can be skipped if implicit encoders are available. In Java and Python, the state encoders need to be provided explicitly.
Built-in encoders for primitives, case classes and Java Bean classes are provided by default via the Spark SQL encoders.</p>
<h4 id="providing-implicit-encoders-in-scala">Providing implicit encoders in Scala</h4>
<p>In Scala, implicit encoders can be provided for case classes and primitive types. The <code class="language-plaintext highlighter-rouge">implicits</code> object is provided as part of the <code class="language-plaintext highlighter-rouge">StatefulProcessor</code> class. Within the StatefulProcessor definition, the user can simply import implicits as <code class="language-plaintext highlighter-rouge">import implicits._</code> and then they do not require to pass the encoder type explicitly.</p>
<h3 id="providing-ttl-for-state-variables">Providing TTL for state variables</h3>
<p>State variables can be configured with an optional TTL (Time-To-Live) value. The TTL value is used to automatically evict the state variable after the specified duration. The TTL value can be provided as a Duration.</p>
<h3 id="handling-input-rows">Handling input rows</h3>
<p>The <code class="language-plaintext highlighter-rouge">handleInputRows</code> method is used to process input rows belonging to a grouping key and emit output if needed. The method is invoked by the Spark query engine for each grouping key value received by the operator. If multiple rows belong to the same grouping key, the provided iterator will include all those rows.</p>
<h3 id="handling-expired-timers">Handling expired timers</h3>
<p>Within the <code class="language-plaintext highlighter-rouge">handleInputRows</code> or <code class="language-plaintext highlighter-rouge">handleExpiredTimer</code> methods, the stateful processor can register timers to be triggered at a later time. The <code class="language-plaintext highlighter-rouge">handleExpiredTimer</code> method is invoked by the Spark query engine when a timer set by the stateful processor has expired. This method is invoked once for each expired timer.
Here are a few timer properties that are supported:</p>
<ul>
<li>Multiple timers associated with the same grouping key can be registered</li>
<li>The engine provides the ability to list/add/remove timers as needed</li>
<li>Timers are also checkpointed as part of the query checkpoint and can be triggered on query restart as well.</li>
</ul>
<h3 id="handling-initial-state">Handling initial state</h3>
<p>The <code class="language-plaintext highlighter-rouge">handleInitialState</code> method is used to optionally handle the initial state batch dataframe. The initial state batch dataframe is used to pre-populate the state for the stateful processor. The method is invoked by the Spark query engine when the initial state batch dataframe is available.
This method is only called once in the lifetime of the query. This is invoked before any input rows are processed by the stateful processor.</p>
<h3 id="putting-it-all-together">Putting it all together</h3>
<p>Here is an example of a StatefulProcessor that implements a downtime detector. Each time a new value is seen for a given key, it updates the lastSeen state value, clears any existing timers, and resets a timer for the future.</p>
<p>When a timer expires, the application emits the elapsed time since the last observed event for the key. It then sets a new timer to emit an update 10 seconds later.</p>
<p>NOTE: <code class="language-plaintext highlighter-rouge">python_Pandas</code> tab guides the implementation of StatefulProcessor for <code class="language-plaintext highlighter-rouge">transformWithStateInPandas</code>, and <code class="language-plaintext highlighter-rouge">python_Row</code> tab guides the implementation of StatefulProcessor for <code class="language-plaintext highlighter-rouge">transformWithState</code>.</p>
<div class="codetabs">
<div data-lang="python_Pandas">
<figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">class</span> <span class="nc">DownTimeDetector</span><span class="p">(</span><span class="n">StatefulProcessor</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">init</span><span class="p">(</span><span class="n">self</span><span class="p">,</span> <span class="n">handle</span><span class="p">:</span> <span class="n">StatefulProcessorHandle</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="bp">None</span><span class="p">:</span>
<span class="c1"># Define schema for the state value (timestamp)
</span> <span class="n">state_schema</span> <span class="o">=</span> <span class="nc">StructType</span><span class="p">([</span><span class="nc">StructField</span><span class="p">(</span><span class="sh">"</span><span class="s">value</span><span class="sh">"</span><span class="p">,</span> <span class="nc">TimestampType</span><span class="p">(),</span> <span class="bp">True</span><span class="p">)])</span>
<span class="n">self</span><span class="p">.</span><span class="n">handle</span> <span class="o">=</span> <span class="n">handle</span>
<span class="c1"># Initialize state to store the last seen timestamp for each key
</span> <span class="n">self</span><span class="p">.</span><span class="n">last_seen</span> <span class="o">=</span> <span class="n">handle</span><span class="p">.</span><span class="nf">getValueState</span><span class="p">(</span><span class="sh">"</span><span class="s">last_seen</span><span class="sh">"</span><span class="p">,</span> <span class="n">state_schema</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">handleExpiredTimer</span><span class="p">(</span><span class="n">self</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">timerValues</span><span class="p">,</span> <span class="n">expiredTimerInfo</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="p">.</span><span class="n">DataFrame</span><span class="p">]:</span>
<span class="n">latest_from_existing</span> <span class="o">=</span> <span class="n">self</span><span class="p">.</span><span class="n">last_seen</span><span class="p">.</span><span class="nf">get</span><span class="p">()</span>
<span class="c1"># Calculate downtime duration
</span> <span class="n">downtime_duration</span> <span class="o">=</span> <span class="n">timerValues</span><span class="p">.</span><span class="nf">getCurrentProcessingTimeInMs</span><span class="p">()</span> <span class="o">-</span> <span class="nf">int</span><span class="p">(</span><span class="n">latest_from_existing</span><span class="p">.</span><span class="nf">timestamp</span><span class="p">()</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">)</span>
<span class="c1"># Register a new timer for 10 seconds in the future
</span> <span class="n">self</span><span class="p">.</span><span class="n">handle</span><span class="p">.</span><span class="nf">registerTimer</span><span class="p">(</span><span class="n">timerValues</span><span class="p">.</span><span class="nf">getCurrentProcessingTimeInMs</span><span class="p">()</span> <span class="o">+</span> <span class="mi">10000</span><span class="p">)</span>
<span class="c1"># Yield a DataFrame with the key and downtime duration
</span> <span class="k">yield</span> <span class="n">pd</span><span class="p">.</span><span class="nc">DataFrame</span><span class="p">(</span>
<span class="p">{</span>
<span class="sh">"</span><span class="s">id</span><span class="sh">"</span><span class="p">:</span> <span class="n">key</span><span class="p">,</span>
<span class="sh">"</span><span class="s">timeValues</span><span class="sh">"</span><span class="p">:</span> <span class="nf">str</span><span class="p">(</span><span class="n">downtime_duration</span><span class="p">),</span>
<span class="p">}</span>
<span class="p">)</span>
<span class="k">def</span> <span class="nf">handleInputRows</span><span class="p">(</span><span class="n">self</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">rows</span><span class="p">,</span> <span class="n">timerValues</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">pd</span><span class="p">.</span><span class="n">DataFrame</span><span class="p">]:</span>
<span class="c1"># Find the row with the maximum timestamp
</span> <span class="n">max_row</span> <span class="o">=</span> <span class="nf">max</span><span class="p">((</span><span class="nf">tuple</span><span class="p">(</span><span class="n">pdf</span><span class="p">.</span><span class="n">iloc</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> <span class="k">for</span> <span class="n">pdf</span> <span class="ow">in</span> <span class="n">rows</span><span class="p">),</span> <span class="n">key</span><span class="o">=</span><span class="k">lambda</span> <span class="n">row</span><span class="p">:</span> <span class="n">row</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span>
<span class="c1"># Get the latest timestamp from existing state or use epoch start if not exists
</span> <span class="k">if</span> <span class="n">self</span><span class="p">.</span><span class="n">last_seen</span><span class="p">.</span><span class="nf">exists</span><span class="p">():</span>
<span class="n">latest_from_existing</span> <span class="o">=</span> <span class="n">self</span><span class="p">.</span><span class="n">last_seen</span><span class="p">.</span><span class="nf">get</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">latest_from_existing</span> <span class="o">=</span> <span class="n">datetime</span><span class="p">.</span><span class="nf">fromtimestamp</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span>
<span class="c1"># If new data is more recent than existing state
</span> <span class="k">if</span> <span class="n">latest_from_existing</span> <span class="o">&lt;</span> <span class="n">max_row</span><span class="p">[</span><span class="mi">1</span><span class="p">]:</span>
<span class="c1"># Delete all existing timers
</span> <span class="k">for</span> <span class="n">timer</span> <span class="ow">in</span> <span class="n">self</span><span class="p">.</span><span class="n">handle</span><span class="p">.</span><span class="nf">listTimers</span><span class="p">():</span>
<span class="n">self</span><span class="p">.</span><span class="n">handle</span><span class="p">.</span><span class="nf">deleteTimer</span><span class="p">(</span><span class="n">timer</span><span class="p">)</span>
<span class="c1"># Update the last seen timestamp
</span> <span class="n">self</span><span class="p">.</span><span class="n">last_seen</span><span class="p">.</span><span class="nf">update</span><span class="p">((</span><span class="n">max_row</span><span class="p">[</span><span class="mi">1</span><span class="p">],))</span>
<span class="c1"># Register a new timer for 5 seconds in the future
</span> <span class="n">self</span><span class="p">.</span><span class="n">handle</span><span class="p">.</span><span class="nf">registerTimer</span><span class="p">(</span><span class="n">timerValues</span><span class="p">.</span><span class="nf">getCurrentProcessingTimeInMs</span><span class="p">()</span> <span class="o">+</span> <span class="mi">5000</span><span class="p">)</span>
<span class="c1"># Yield an empty DataFrame
</span> <span class="k">yield</span> <span class="n">pd</span><span class="p">.</span><span class="nc">DataFrame</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">close</span><span class="p">(</span><span class="n">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="bp">None</span><span class="p">:</span>
<span class="c1"># No cleanup needed
</span> <span class="k">pass</span></code></pre></figure>
</div>
<div data-lang="python_Row">
<figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">class</span> <span class="nc">DownTimeDetector</span><span class="p">(</span><span class="n">StatefulProcessor</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">init</span><span class="p">(</span><span class="n">self</span><span class="p">,</span> <span class="n">handle</span><span class="p">:</span> <span class="n">StatefulProcessorHandle</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="bp">None</span><span class="p">:</span>
<span class="c1"># Define schema for the state value (timestamp)
</span> <span class="n">state_schema</span> <span class="o">=</span> <span class="nc">StructType</span><span class="p">([</span><span class="nc">StructField</span><span class="p">(</span><span class="sh">"</span><span class="s">value</span><span class="sh">"</span><span class="p">,</span> <span class="nc">TimestampType</span><span class="p">(),</span> <span class="bp">True</span><span class="p">)])</span>
<span class="n">self</span><span class="p">.</span><span class="n">handle</span> <span class="o">=</span> <span class="n">handle</span>
<span class="c1"># Initialize state to store the last seen timestamp for each key
</span> <span class="n">self</span><span class="p">.</span><span class="n">last_seen</span> <span class="o">=</span> <span class="n">handle</span><span class="p">.</span><span class="nf">getValueState</span><span class="p">(</span><span class="sh">"</span><span class="s">last_seen</span><span class="sh">"</span><span class="p">,</span> <span class="n">state_schema</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">handleExpiredTimer</span><span class="p">(</span><span class="n">self</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">timerValues</span><span class="p">,</span> <span class="n">expiredTimerInfo</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">Row</span><span class="p">]:</span>
<span class="n">latest_from_existing</span> <span class="o">=</span> <span class="n">self</span><span class="p">.</span><span class="n">last_seen</span><span class="p">.</span><span class="nf">get</span><span class="p">()</span>
<span class="c1"># Calculate downtime duration
</span> <span class="n">downtime_duration</span> <span class="o">=</span> <span class="n">timerValues</span><span class="p">.</span><span class="nf">getCurrentProcessingTimeInMs</span><span class="p">()</span> <span class="o">-</span> <span class="nf">int</span><span class="p">(</span><span class="n">latest_from_existing</span><span class="p">.</span><span class="nf">timestamp</span><span class="p">()</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">)</span>
<span class="c1"># Register a new timer for 10 seconds in the future
</span> <span class="n">self</span><span class="p">.</span><span class="n">handle</span><span class="p">.</span><span class="nf">registerTimer</span><span class="p">(</span><span class="n">timerValues</span><span class="p">.</span><span class="nf">getCurrentProcessingTimeInMs</span><span class="p">()</span> <span class="o">+</span> <span class="mi">10000</span><span class="p">)</span>
<span class="c1"># Yield a DataFrame with the key and downtime duration
</span> <span class="k">yield</span> <span class="nc">Row</span><span class="p">(</span><span class="nb">id</span><span class="o">=</span><span class="n">key</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">timeValues</span><span class="o">=</span><span class="nf">str</span><span class="p">(</span><span class="n">downtime_duration</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">handleInputRows</span><span class="p">(</span><span class="n">self</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">rows</span><span class="p">,</span> <span class="n">timerValues</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">[</span><span class="n">Row</span><span class="p">]:</span>
<span class="c1"># Find the maximum timestamp
</span> <span class="n">max_timestamp</span> <span class="o">=</span> <span class="nf">max</span><span class="p">(</span><span class="nf">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">.</span><span class="n">timestamp</span><span class="p">,</span> <span class="n">rows</span><span class="p">))</span>
<span class="c1"># Get the latest timestamp from existing state or use epoch start if not exists
</span> <span class="k">if</span> <span class="n">self</span><span class="p">.</span><span class="n">last_seen</span><span class="p">.</span><span class="nf">exists</span><span class="p">():</span>
<span class="n">latest_from_existing</span> <span class="o">=</span> <span class="n">self</span><span class="p">.</span><span class="n">last_seen</span><span class="p">.</span><span class="nf">get</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">latest_from_existing</span> <span class="o">=</span> <span class="n">datetime</span><span class="p">.</span><span class="nf">fromtimestamp</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span>
<span class="c1"># If new data is more recent than existing state
</span> <span class="k">if</span> <span class="n">latest_from_existing</span> <span class="o">&lt;</span> <span class="n">max_timestamp</span><span class="p">:</span>
<span class="c1"># Delete all existing timers
</span> <span class="k">for</span> <span class="n">timer</span> <span class="ow">in</span> <span class="n">self</span><span class="p">.</span><span class="n">handle</span><span class="p">.</span><span class="nf">listTimers</span><span class="p">():</span>
<span class="n">self</span><span class="p">.</span><span class="n">handle</span><span class="p">.</span><span class="nf">deleteTimer</span><span class="p">(</span><span class="n">timer</span><span class="p">)</span>
<span class="c1"># Update the last seen timestamp
</span> <span class="n">self</span><span class="p">.</span><span class="n">last_seen</span><span class="p">.</span><span class="nf">update</span><span class="p">((</span><span class="n">max_timestamp</span><span class="p">,))</span>
<span class="c1"># Register a new timer for 5 seconds in the future
</span> <span class="n">self</span><span class="p">.</span><span class="n">handle</span><span class="p">.</span><span class="nf">registerTimer</span><span class="p">(</span><span class="n">timerValues</span><span class="p">.</span><span class="nf">getCurrentProcessingTimeInMs</span><span class="p">()</span> <span class="o">+</span> <span class="mi">5000</span><span class="p">)</span>
<span class="k">return</span> <span class="nf">iter</span><span class="p">([])</span>
<span class="k">def</span> <span class="nf">close</span><span class="p">(</span><span class="n">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="bp">None</span><span class="p">:</span>
<span class="c1"># No cleanup needed
</span> <span class="k">pass</span></code></pre></figure>
</div>
<div data-lang="scala">
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// The (String, Timestamp) schema represents an (id, time). We want to do downtime</span>
<span class="c1">// detection on every single unique sensor, where each sensor has a sensor ID.</span>
<span class="k">class</span> <span class="nc">DowntimeDetector</span><span class="o">(</span><span class="n">duration</span><span class="k">:</span> <span class="kt">Duration</span><span class="o">)</span> <span class="k">extends</span>
<span class="nc">StatefulProcessor</span><span class="o">[</span><span class="kt">String</span>, <span class="o">(</span><span class="kt">String</span>, <span class="kt">Timestamp</span><span class="o">)</span>, <span class="o">(</span><span class="kt">String</span>, <span class="kt">Duration</span><span class="o">)]</span> <span class="o">{</span>
<span class="nd">@transient</span> <span class="k">private</span> <span class="k">var</span> <span class="nc">_lastSeen</span><span class="k">:</span> <span class="kt">ValueState</span><span class="o">[</span><span class="kt">Timestamp</span><span class="o">]</span> <span class="k">=</span> <span class="k">_</span>
<span class="k">override</span> <span class="k">def</span> <span class="nf">init</span><span class="o">(</span><span class="n">outputMode</span><span class="k">:</span> <span class="kt">OutputMode</span><span class="o">,</span> <span class="n">timeMode</span><span class="k">:</span> <span class="kt">TimeMode</span><span class="o">)</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="nc">_lastSeen</span> <span class="k">=</span> <span class="nv">getHandle</span><span class="o">.</span><span class="py">getValueState</span><span class="o">[</span><span class="kt">Timestamp</span><span class="o">](</span><span class="s">"lastSeen"</span><span class="o">,</span> <span class="nv">Encoders</span><span class="o">.</span><span class="py">TIMESTAMP</span><span class="o">,</span> <span class="nv">TTLConfig</span><span class="o">.</span><span class="py">NONE</span><span class="o">)</span>
<span class="o">}</span>
<span class="c1">// The logic here is as follows: find the largest timestamp seen so far. Set a timer for</span>
<span class="c1">// the duration later.</span>
<span class="k">override</span> <span class="k">def</span> <span class="nf">handleInputRows</span><span class="o">(</span>
<span class="n">key</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span>
<span class="n">inputRows</span><span class="k">:</span> <span class="kt">Iterator</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Timestamp</span><span class="o">)],</span>
<span class="n">timerValues</span><span class="k">:</span> <span class="kt">TimerValues</span><span class="o">)</span><span class="k">:</span> <span class="kt">Iterator</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Duration</span><span class="o">)]</span> <span class="k">=</span> <span class="o">{</span>
<span class="k">val</span> <span class="nv">latestRecordFromNewRows</span> <span class="k">=</span> <span class="nv">inputRows</span><span class="o">.</span><span class="py">maxBy</span><span class="o">(</span><span class="nv">_</span><span class="o">.</span><span class="py">_2</span><span class="o">.</span><span class="py">getTime</span><span class="o">)</span>
<span class="k">val</span> <span class="nv">latestTimestampFromExistingRows</span> <span class="k">=</span> <span class="nf">if</span> <span class="o">(</span><span class="nv">_lastSeen</span><span class="o">.</span><span class="py">exists</span><span class="o">())</span> <span class="o">{</span>
<span class="nv">_lastSeen</span><span class="o">.</span><span class="py">get</span><span class="o">()</span>
<span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
<span class="k">new</span> <span class="nc">Timestamp</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">}</span>
<span class="k">val</span> <span class="nv">latestTimestampFromNewRows</span> <span class="k">=</span> <span class="nv">latestRecordFromNewRows</span><span class="o">.</span><span class="py">_2</span>
<span class="nf">if</span> <span class="o">(</span><span class="nv">latestTimestampFromNewRows</span><span class="o">.</span><span class="py">after</span><span class="o">(</span><span class="n">latestTimestampFromExistingRows</span><span class="o">))</span> <span class="o">{</span>
<span class="c1">// Cancel the one existing timer, since we have a new latest timestamp.</span>
<span class="c1">// We call "listTimers()" just because we don't know ahead of time what</span>
<span class="c1">// the timestamp of the existing timer is.</span>
<span class="nv">getHandle</span><span class="o">.</span><span class="py">listTimers</span><span class="o">().</span><span class="py">foreach</span><span class="o">(</span><span class="n">timer</span> <span class="k">=&gt;</span> <span class="nv">getHandle</span><span class="o">.</span><span class="py">deleteTimer</span><span class="o">(</span><span class="n">timer</span><span class="o">))</span>
<span class="nv">_lastSeen</span><span class="o">.</span><span class="py">update</span><span class="o">(</span><span class="n">latestTimestampFromNewRows</span><span class="o">)</span>
<span class="c1">// Use timerValues to schedule a timer using processing time.</span>
<span class="nv">getHandle</span><span class="o">.</span><span class="py">registerTimer</span><span class="o">(</span><span class="nv">timerValues</span><span class="o">.</span><span class="py">getCurrentProcessingTimeInMs</span><span class="o">()</span> <span class="o">+</span> <span class="nv">duration</span><span class="o">.</span><span class="py">toMillis</span><span class="o">)</span>
<span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
<span class="c1">// No new latest timestamp, so no need to update state or set a timer.</span>
<span class="o">}</span>
<span class="nv">Iterator</span><span class="o">.</span><span class="py">empty</span>
<span class="o">}</span>
<span class="k">override</span> <span class="k">def</span> <span class="nf">handleExpiredTimer</span><span class="o">(</span>
<span class="n">key</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span>
<span class="n">timerValues</span><span class="k">:</span> <span class="kt">TimerValues</span><span class="o">,</span>
<span class="n">expiredTimerInfo</span><span class="k">:</span> <span class="kt">ExpiredTimerInfo</span><span class="o">)</span><span class="k">:</span> <span class="kt">Iterator</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Duration</span><span class="o">)]</span> <span class="k">=</span> <span class="o">{</span>
<span class="k">val</span> <span class="nv">latestTimestamp</span> <span class="k">=</span> <span class="nv">_lastSeen</span><span class="o">.</span><span class="py">get</span><span class="o">()</span>
<span class="k">val</span> <span class="nv">downtimeDuration</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Duration</span><span class="o">(</span>
<span class="nv">timerValues</span><span class="o">.</span><span class="py">getCurrentProcessingTimeInMs</span><span class="o">()</span> <span class="o">-</span> <span class="nv">latestTimestamp</span><span class="o">.</span><span class="py">getTime</span><span class="o">)</span>
<span class="c1">// Register another timer that will fire in 10 seconds.</span>
<span class="c1">// Timers can be registered anywhere but init()</span>
<span class="nv">getHandle</span><span class="o">.</span><span class="py">registerTimer</span><span class="o">(</span><span class="nv">timerValues</span><span class="o">.</span><span class="py">getCurrentProcessingTimeInMs</span><span class="o">()</span> <span class="o">+</span> <span class="mi">10000</span><span class="o">)</span>
<span class="nc">Iterator</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">downtimeDuration</span><span class="o">))</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></figure>
</div>
</div>
<h3 id="using-the-statefulprocessor-in-a-streaming-query">Using the StatefulProcessor in a streaming query</h3>
<p>Now that we have defined the <code class="language-plaintext highlighter-rouge">StatefulProcessor</code>, we can use it in a streaming query. The following code snippets show how to use the <code class="language-plaintext highlighter-rouge">StatefulProcessor</code> in a streaming query in Python and Scala.</p>
<div class="codetabs">
<div data-lang="python_Pandas">
<figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">q</span> <span class="o">=</span> <span class="p">(</span><span class="n">df</span><span class="p">.</span><span class="nf">groupBy</span><span class="p">(</span><span class="sh">"</span><span class="s">key</span><span class="sh">"</span><span class="p">)</span>
<span class="p">.</span><span class="nf">transformWithStateInPandas</span><span class="p">(</span>
<span class="n">statefulProcessor</span><span class="o">=</span><span class="nc">DownTimeDetector</span><span class="p">(),</span>
<span class="n">outputStructType</span><span class="o">=</span><span class="n">output_schema</span><span class="p">,</span>
<span class="n">outputMode</span><span class="o">=</span><span class="sh">"</span><span class="s">Update</span><span class="sh">"</span><span class="p">,</span>
<span class="n">timeMode</span><span class="o">=</span><span class="sh">"</span><span class="s">None</span><span class="sh">"</span><span class="p">,</span>
<span class="p">)</span>
<span class="p">.</span><span class="n">writeStream</span><span class="bp">...</span></code></pre></figure>
</div>
<div data-lang="python_Row">
<figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">q</span> <span class="o">=</span> <span class="p">(</span><span class="n">df</span><span class="p">.</span><span class="nf">groupBy</span><span class="p">(</span><span class="sh">"</span><span class="s">key</span><span class="sh">"</span><span class="p">)</span>
<span class="p">.</span><span class="nf">transformWithState</span><span class="p">(</span>
<span class="n">statefulProcessor</span><span class="o">=</span><span class="nc">DownTimeDetector</span><span class="p">(),</span>
<span class="n">outputStructType</span><span class="o">=</span><span class="n">output_schema</span><span class="p">,</span>
<span class="n">outputMode</span><span class="o">=</span><span class="sh">"</span><span class="s">Update</span><span class="sh">"</span><span class="p">,</span>
<span class="n">timeMode</span><span class="o">=</span><span class="sh">"</span><span class="s">None</span><span class="sh">"</span><span class="p">,</span>
<span class="p">)</span>
<span class="p">.</span><span class="n">writeStream</span><span class="bp">...</span>
</code></pre></figure>
</div>
<div data-lang="scala">
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">query</span> <span class="k">=</span> <span class="nv">df</span><span class="o">.</span><span class="py">groupBy</span><span class="o">(</span><span class="s">"key"</span><span class="o">)</span>
<span class="o">.</span><span class="py">transformWithState</span><span class="o">(</span>
<span class="n">statefulProcessor</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">DownTimeDetector</span><span class="o">(),</span>
<span class="n">outputMode</span> <span class="k">=</span> <span class="nv">OutputMode</span><span class="o">.</span><span class="py">Update</span><span class="o">,</span>
<span class="n">timeMode</span> <span class="k">=</span> <span class="nv">TimeMode</span><span class="o">.</span><span class="py">None</span><span class="o">)</span>
<span class="o">.</span><span class="py">writeStream</span><span class="o">...</span></code></pre></figure>
</div>
</div>
<h2 id="state-schema-evolution">State Schema Evolution</h2>
<p>TransformWithState also allows for performing schema evolution of the managed state. There are 2 parts here:</p>
<ul>
<li>evolution across state variables</li>
<li>evolution within a state variable</li>
</ul>
<p>Note that schema evolution is only supported on the value side. Key side state schema evolution is not supported.</p>
<h3 id="evolution-across-state-variables">Evolution across state variables</h3>
<p>This operator allows for state variables to be added and removed across different runs of the same streaming query. In order to remove a variable, we also need to inform the engine so that the underlying state can be purged. Users can achieve this by invoking the <code class="language-plaintext highlighter-rouge">deleteIfExists</code> method for a given state variable within the <code class="language-plaintext highlighter-rouge">init</code> method of the StatefulProcessor.</p>
<h3 id="evolution-within-a-state-variable">Evolution within a state variable</h3>
<p>This operator also allows for the state schema of a specific state variable to also be evolved. For example, if you are using a case class to store the state within a <code class="language-plaintext highlighter-rouge">ValueState</code> variable, then it&#8217;s possible for you to evolve this case class by adding/removing/widening fields.
We support such schema evolution only when the underlying encoding format is set to <code class="language-plaintext highlighter-rouge">Avro</code>. In order to enable this, please set the following Spark config as <code class="language-plaintext highlighter-rouge">spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")</code>.</p>
<p>The following evolution operations are supported within Avro rules:</p>
<ul>
<li>Adding a new field</li>
<li>Removing a field</li>
<li>Type widening</li>
<li>Reordering fields</li>
</ul>
<p>The following evolution operations are not supported:</p>
<ul>
<li>Renaming a field</li>
<li>Type narrowing</li>
</ul>
<h2 id="integration-with-state-data-source">Integration with State Data Source</h2>
<p>TransformWithState is a stateful operator that allows users to maintain arbitrary state across batches. In order to read this state, the user needs to provide some additional options in the state data source reader query.
This operator allows for multiple state variables to be used within the same query. However, because they could be of different composite types and encoding formats, they need to be read within a batch query one variable at a time.
In order to allow this, the user needs to specify the <code class="language-plaintext highlighter-rouge">stateVarName</code> for the state variable they are interested in reading.</p>
<p>Timers can read by setting the option <code class="language-plaintext highlighter-rouge">readRegisteredTimers</code> to true. This will return all the registered timer across grouping keys.</p>
<p>We also allow for composite type variables to be read in 2 formats:</p>
<ul>
<li>Flattened: This is the default format where the composite types are flattened out into individual columns.</li>
<li>Non-flattened: This is where the composite types are returned as a single column of Array or Map type in Spark SQL.</li>
</ul>
<p>Depending on your memory requirements, you can choose the format that best suits your use case.
More information about source options can be found <a href="./structured-streaming-state-data-source.html">here</a>.</p>
</div>
<!-- /container -->
</div>
<script src="../js/vendor/jquery-3.5.1.min.js"></script>
<script src="../js/vendor/bootstrap.bundle.min.js"></script>
<script src="../js/vendor/anchor.min.js"></script>
<script src="../js/main.js"></script>
<script type="text/javascript" src="../js/vendor/docsearch.min.js"></script>
<script type="text/javascript">
// DocSearch is entirely free and automated. DocSearch is built in two parts:
// 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link
// in your website and extract content from every page it traverses. It then pushes this
// content to an Algolia index.
// 2. a JavaScript snippet to be inserted in your website that will bind this Algolia index
// to your search input and display its results in a dropdown UI. If you want to find more
// details on how works DocSearch, check the docs of DocSearch.
docsearch({
apiKey: 'd62f962a82bc9abb53471cb7b89da35e',
appId: 'RAI69RXRSK',
indexName: 'apache_spark',
inputSelector: '#docsearch-input',
enhancedSearchInput: true,
algoliaOptions: {
'facetFilters': ["version:4.1.0-preview1"]
},
debug: false // Set debug to true if you want to inspect the dropdown
});
</script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' +
'?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>