blob: a5c13352bbb459374cde1c9bd97b062b2def6436 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script><!--#include virtual="../../js/templateData.js" --></script>
<script id="content-template" type="text/x-handlebars-template">
<!-- h1>Developer Guide for Kafka Streams</h1 -->
<div class="sub-nav-sticky">
<div class="sticky-top">
<!-- div style="height:35px">
<a href="/{{version}}/documentation/streams/">Introduction</a>
<a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
</div -->
</div>
</div>
<div class="section" id="processor-api">
<span id="streams-developer-guide-processor-api"></span><h1>Processor API<a class="headerlink" href="#processor-api" title="Permalink to this headline"></a></h1>
<p>The Processor API allows developers to define and connect custom processors and to interact with state stores. With the
Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these
processors with their associated state stores to compose the processor topology that represents a customized processing
logic.</p>
<div class="contents local topic" id="table-of-contents">
<p class="topic-title first"><b>Table of Contents</b></p>
<ul class="simple">
<li><a class="reference internal" href="#overview" id="id1">Overview</a></li>
<li><a class="reference internal" href="#defining-a-stream-processor" id="id2">Defining a Stream
Processor</a></li>
<li><a class="reference internal" href="#unit-testing-processors" id="id9">Unit Testing Processors</a></li>
<li><a class="reference internal" href="#state-stores" id="id3">State Stores</a>
<ul>
<li><a class="reference internal" href="#defining-and-creating-a-state-store" id="id4">Defining and creating a State Store</a></li>
<li><a class="reference internal" href="#fault-tolerant-state-stores" id="id5">Fault-tolerant State Stores</a></li>
<li><a class="reference internal" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" id="id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a></li>
<li><a class="reference internal" href="#timestamped-state-stores" id="id11">Timestamped State Stores</a></li>
<li><a class="reference internal" href="#implementing-custom-state-stores" id="id7">Implementing Custom State Stores</a></li>
</ul>
</li>
<li><a class="reference internal" href="#connecting-processors-and-state-stores" id="id8">Connecting Processors and State Stores</a></li>
<li><a class="reference internal" href="#accessing-processor-context" id="id10">Accessing Processor Context</a></li>
</ul>
</div>
<div class="section" id="overview">
<h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#overview" title="Permalink to this headline"></a></h2>
<p>The Processor API can be used to implement both <strong>stateless</strong> as well as <strong>stateful</strong> operations, where the latter is
achieved through the use of <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a>.</p>
<div class="admonition tip">
<p><b>Tip</b></p>
<p class="last"><strong>Combining the DSL and the Processor API:</strong>
You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the
section <a class="reference internal" href="dsl-api.html#streams-developer-guide-dsl-process"><span class="std std-ref">Applying processors and transformers (Processor API integration)</span></a>.</p>
</div>
<p>For a complete list of available API functionality, see the <a href="/{{version}}/javadoc/org/apache/kafka/streams/package-summary.html">Streams</a> API docs.</p>
</div>
<div class="section" id="defining-a-stream-processor">
<span id="streams-developer-guide-stream-processor"></span><h2><a class="toc-backref" href="#id2">Defining a Stream Processor</a><a class="headerlink" href="#defining-a-stream-processor" title="Permalink to this headline"></a></h2>
<p>A <a class="reference internal" href="../core-concepts.html#streams_processor_node"><span class="std std-ref">stream processor</span></a> is a node in the processor topology that represents a single processing step.
With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect
these processors with their associated state stores to compose the processor topology.</p>
<p>You can define a customized stream processor by implementing the <code class="docutils literal"><span class="pre">Processor</span></code> interface, which provides the <code class="docutils literal"><span class="pre">process()</span></code> API method.
The <code class="docutils literal"><span class="pre">process()</span></code> method is called on each of the received records.</p>
<p>The <code class="docutils literal"><span class="pre">Processor</span></code> interface also has an <code class="docutils literal"><span class="pre">init()</span></code> method, which is called by the Kafka Streams library during task construction
phase. Processor instances should perform any required initialization in this method. The <code class="docutils literal"><span class="pre">init()</span></code> method passes in a <code class="docutils literal"><span class="pre">ProcessorContext</span></code>
instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition,
its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation
function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record as a key-value pair to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>),
and to commit the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>).
Any resources you set up in <code class="docutils literal"><span class="pre">init()</span></code> can be cleaned up in the
<code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
<code class="docutils literal"><span class="pre">Processor</span></code> object by calling
<code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
<p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
(1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
(2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
<p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
for the punctuation scheduling: either <a class="reference internal" href="../core-concepts.html#streams_time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time
is configured to represent event-time via <code class="docutils literal"><span class="pre">TimestampExtractor</span></code>). When stream-time is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely
by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there
is no new input data arriving, stream-time is not advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> is not called.</p>
<p>For example, if you schedule a <code class="docutils literal"><span class="pre">Punctuator</span></code> function every 10 seconds based on <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> and if you
process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record),
then <code class="docutils literal"><span class="pre">punctuate()</span></code> would be called 6 times. This happens regardless of the time required to actually process those records. <code class="docutils literal"><span class="pre">punctuate()</span></code>
would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.</p>
<p>When wall-clock-time (i.e. <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>) is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely by the wall-clock time.
Reusing the example above, if the <code class="docutils literal"><span class="pre">Punctuator</span></code> function is scheduled based on <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>, and if these
60 records were processed within 20 seconds, <code class="docutils literal"><span class="pre">punctuate()</span></code> is called 2 times (one time every 10 seconds). If these 60 records
were processed within 5 seconds, then no <code class="docutils literal"><span class="pre">punctuate()</span></code> is called at all. Note that you can schedule multiple <code class="docutils literal"><span class="pre">Punctuator</span></code>
callbacks with different <code class="docutils literal"><span class="pre">PunctuationType</span></code> types within the same processor by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> multiple
times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p>
<div class="admonition attention">
<p class="first admonition-title"><b>Attention</b></p>
<p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available.
If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
This behavior is independent of the configured timestamp extractor, i.e., using <code class="docutils literal"><span class="pre">WallclockTimestampExtractor</span></code> does not enable wall-clock triggering of <code class="docutils literal"><span class="pre">punctuate()</span></code>.</p>
</div>
<p><b>Example</b></p>
<p>The following example <code class="docutils literal"><span class="pre">Processor</span></code> defines a simple word-count algorithm and the following actions are performed:</p>
<ul class="simple">
<li>In the <code class="docutils literal"><span class="pre">init()</span></code> method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name &#8220;Counts&#8221;.</li>
<li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li>
<li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li>
</ul>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCountProcessor</span> <span class="kd">implements</span> <span class="n">Processor</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">private</span> <span class="n">ProcessorContext</span> <span class="n">context</span><span class="o">;</span>
<span class="kd">private</span> <span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">kvStore</span><span class="o">;</span>
<span class="nd">@Override</span>
<span class="nd">@SuppressWarnings</span><span class="o">(</span><span class="s">&quot;unchecked&quot;</span><span class="o">)</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">ProcessorContext</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// keep the processor context locally because we need it in punctuate() and commit()</span>
<span class="k">this</span><span class="o">.</span><span class="na">context</span> <span class="o">=</span> <span class="n">context</span><span class="o">;</span>
<span class="c1">// retrieve the key-value store named &quot;Counts&quot;</span>
<span class="n">kvStore</span> <span class="o">=</span> <span class="o">(</span><span class="n">KeyValueStore</span><span class="o">)</span> <span class="n">context</span><span class="o">.</span><span class="na">getStateStore</span><span class="o">(</span><span class="s">&quot;Counts&quot;</span><span class="o">);</span>
<span class="c1">// schedule a punctuate() method every second based on stream-time</span>
<span class="k">this</span><span class="o">.</span><span class="na">context</span><span class="o">.</span><span class="na">schedule</span><span class="o">(</span><span class="na">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">1000</span><span class="o">),</span> <span class="n">PunctuationType</span><span class="o">.</span><span class="na">STREAM_TIME</span><span class="o">,</span> <span class="o">(</span><span class="n">timestamp</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">KeyValueIterator</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">iter</span> <span class="o">=</span> <span class="k">this</span><span class="o">.</span><span class="na">kvStore</span><span class="o">.</span><span class="na">all</span><span class="o">();</span>
<span class="k">while</span> <span class="o">(</span><span class="n">iter</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
<span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">entry</span> <span class="o">=</span> <span class="n">iter</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
<span class="n">context</span><span class="o">.</span><span class="na">forward</span><span class="o">(</span><span class="n">entry</span><span class="o">.</span><span class="na">key</span><span class="o">,</span> <span class="n">entry</span><span class="o">.</span><span class="na">value</span><span class="o">.</span><span class="na">toString</span><span class="o">());</span>
<span class="o">}</span>
<span class="n">iter</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
<span class="c1">// commit the current processing progress</span>
<span class="n">context</span><span class="o">.</span><span class="na">commit</span><span class="o">();</span>
<span class="o">});</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">punctuate</span><span class="o">(</span><span class="kt">long</span> <span class="n">timestamp</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// this method is deprecated and should not be used anymore</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">// close any resources managed by this processor</span>
<span class="c1">// Note: Do not close any StateStores as these are managed by the library</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
<div class="admonition note">
<p><b>Note</b></p>
<p class="last"><strong>Stateful processing with state stores:</strong>
The <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> defined above can access the currently received record in its <code class="docutils literal"><span class="pre">process()</span></code> method, and it can
leverage <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a> to maintain processing states to, for example, remember recently
arrived records for stateful processing needs like aggregations and joins. For more information, see the <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a> documentation.</p>
</div>
</div>
<div class="section" id="unit-testing-processors">
<h2>
<a class="toc-backref" href="#id9">Unit Testing Processors</a>
<a class="headerlink" href="#unit-testing-processors" title="Permalink to this headline"></a>
</h2>
<p>
Kafka Streams comes with a <code>test-utils</code> module to help you write unit tests for your
processors <a href="testing.html#unit-testing-processors">here</a>.
</p>
</div>
<div class="section" id="state-stores">
<span id="streams-developer-guide-state-store"></span><h2><a class="toc-backref" href="#id3">State Stores</a><a class="headerlink" href="#state-stores" title="Permalink to this headline"></a></h2>
<p>To implement a <strong>stateful</strong> <code class="docutils literal"><span class="pre">Processor</span></code> or <code class="docutils literal"><span class="pre">Transformer</span></code>, you must provide one or more state stores to the processor
or transformer (<em>stateless</em> processors or transformers do not need state stores). State stores can be used to remember
recently received input records, to track rolling aggregates, to de-duplicate input records, and more.
Another feature of state stores is that they can be
<a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactively queried</span></a> from other applications, such as a
NodeJS-based dashboard or a microservice implemented in Scala or Go.</p>
<p>The
<a class="reference internal" href="#streams-developer-guide-state-store-defining"><span class="std std-ref">available state store types</span></a> in Kafka Streams have
<a class="reference internal" href="#streams-developer-guide-state-store-fault-tolerance"><span class="std std-ref">fault tolerance</span></a> enabled by default.</p>
<div class="section" id="defining-and-creating-a-state-store">
<span id="streams-developer-guide-state-store-defining"></span><h3><a class="toc-backref" href="#id4">Defining and creating a State Store</a><a class="headerlink" href="#defining-and-creating-a-state-store" title="Permalink to this headline"></a></h3>
<p>You can either use one of the available store types or
<a class="reference internal" href="#streams-developer-guide-state-store-custom"><span class="std std-ref">implement your own custom store type</span></a>.
It&#8217;s common practice to leverage an existing store type via the <code class="docutils literal"><span class="pre">Stores</span></code> factory.</p>
<p>Note that, when using Kafka Streams, you normally don&#8217;t create or instantiate state stores directly in your code.
Rather, you define state stores indirectly by creating a so-called <code class="docutils literal"><span class="pre">StoreBuilder</span></code>. This builder is used by
Kafka Streams as a factory to instantiate the actual state stores locally in application instances when and where
needed.</p>
<p>The following store types are available out of the box.</p>
<table border="1" class="non-scrolling-table width-100-percent docutils">
<colgroup>
<col width="19%" />
<col width="11%" />
<col width="18%" />
<col width="51%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Store Type</th>
<th class="head">Storage Engine</th>
<th class="head">Fault-tolerant?</th>
<th class="head">Description</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td>Persistent
<code class="docutils literal"><span class="pre">KeyValueStore&lt;K,</span> <span class="pre">V&gt;</span></code></td>
<td>RocksDB</td>
<td>Yes (enabled by default)</td>
<td><ul class="first simple">
<li><strong>The recommended store type for most use cases.</strong></li>
<li>Stores its data on local disk.</li>
<li>Storage capacity:
managed local state can be larger than the memory (heap space) of an
application instance, but must fit into the available local disk
space.</li>
<li>RocksDB settings can be fine-tuned, see
<a class="reference internal" href="config-streams.html#streams-developer-guide-rocksdb-config"><span class="std std-ref">RocksDB configuration</span></a>.</li>
<li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore-java.lang.String-">store variants</a>:
time window key-value store, session window key-value store.</li>
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore-java.lang.String-">persistentTimestampedKeyValueStore</a>
when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries.</li>
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore-java.lang.String-java.time.Duration-java.time.Duration-boolean-">persistentTimestampedWindowStore</a>
when you need a persistent windowedKey-(value/timestamp) store.</li>
</ul>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Creating a persistent key-value store:</span>
<span class="c1">// here, we create a `KeyValueStore&lt;String, Long&gt;` named &quot;persistent-counts&quot;.</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span>
<span class="c1">// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.</span>
<span class="n">StoreBuilder</span><span class="o">&lt;</span><span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">countStoreSupplier</span> <span class="o">=</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">&quot;persistent-counts&quot;</span><span class="o">),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span>
<span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">countStore</span> <span class="o">=</span> <span class="n">countStoreSupplier</span><span class="o">.</span><span class="na">build</span><span class="o">();</span></code></pre></div>
</div>
</td>
</tr>
<tr class="row-odd"><td>In-memory
<code class="docutils literal"><span class="pre">KeyValueStore&lt;K,</span> <span class="pre">V&gt;</span></code></td>
<td>-</td>
<td>Yes (enabled by default)</td>
<td><ul class="first simple">
<li>Stores its data in memory.</li>
<li>Storage capacity:
managed local state must fit into memory (heap space) of an
application instance.</li>
<li>Useful when application instances run in an environment where local
disk space is either not available or local disk space is wiped
in-between app instance restarts.</li>
<li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-">store variants</a>:
time window key-value store, session window key-value store.</li>
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html">TimestampedKeyValueStore</a>
when you need a key-(value/timestamp) store that supports put/get/delete and range queries.</li>
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html">TimestampedWindowStore</a>
when you need to store windowedKey-(value/timestamp) pairs.</li>
</ul>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Creating an in-memory key-value store:</span>
<span class="c1">// here, we create a `KeyValueStore&lt;String, Long&gt;` named &quot;inmemory-counts&quot;.</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span>
<span class="c1">// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.</span>
<span class="n">StoreBuilder</span><span class="o">&lt;</span><span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">countStoreSupplier</span> <span class="o">=</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">inMemoryKeyValueStore</span><span class="o">(</span><span class="s">&quot;inmemory-counts&quot;</span><span class="o">),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span>
<span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">countStore</span> <span class="o">=</span> <span class="n">countStoreSupplier</span><span class="o">.</span><span class="na">build</span><span class="o">();</span></code></pre></div>
</div>
</td>
</tr>
</tbody>
</table>
</div>
<div class="section" id="fault-tolerant-state-stores">
<span id="streams-developer-guide-state-store-fault-tolerance"></span><h3><a class="toc-backref" href="#id5">Fault-tolerant State Stores</a><a class="headerlink" href="#fault-tolerant-state-stores" title="Permalink to this headline"></a></h3>
<p>To make state stores fault-tolerant and to allow for state store migration without data loss, a state store can be
continuously backed up to a Kafka topic behind the scenes. For example, to migrate a stateful stream task from one
machine to another when <a class="reference internal" href="running-app.html#streams-developer-guide-execution-scaling"><span class="std std-ref">elastically adding or removing capacity from your application</span></a>.
This topic is sometimes referred to as the state store&#8217;s associated <em>changelog topic</em>, or its <em>changelog</em>. For example, if
you experience machine failure, the state store and the application&#8217;s state can be fully restored from its changelog. You can
<a class="reference internal" href="#streams-developer-guide-state-store-enable-disable-fault-tolerance"><span class="std std-ref">enable or disable this backup feature</span></a> for a
state store.</p>
<p>Fault-tolerant state stores are backed by a
<a class="reference external" href="https://kafka.apache.org/documentation.html#compaction">compacted</a> changelog topic. The purpose of compacting this
topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster,
and to minimize recovery time if a state store needs to be restored from its changelog topic.</p>
<p>Fault-tolerant windowed state stores are backed by a topic that uses both compaction and
deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of
deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are
composite keys that include the &#8220;normal&#8221; key and window timestamps. For these types of composite keys it would not
be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion
enabled, old windows that have expired will be cleaned up by Kafka&#8217;s log cleaner as the log segments expire. The
default retention setting is <code class="docutils literal"><span class="pre">Windows#maintainMs()</span></code> + 1 day. You can override this setting by specifying
<code class="docutils literal"><span class="pre">StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG</span></code> in the <code class="docutils literal"><span class="pre">StreamsConfig</span></code>.</p>
<p>When you open an <code class="docutils literal"><span class="pre">Iterator</span></code> from a state store you must call <code class="docutils literal"><span class="pre">close()</span></code> on the iterator when you are done working with
it to reclaim resources; or you can use the iterator from within a try-with-resources statement. If you do not close an iterator,
you may encounter an OOM error.</p>
</div>
<div class="section" id="enable-or-disable-fault-tolerance-of-state-stores-store-changelogs">
<span id="streams-developer-guide-state-store-enable-disable-fault-tolerance"></span><h3><a class="toc-backref" href="#id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a><a class="headerlink" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" title="Permalink to this headline"></a></h3>
<p>You can enable or disable fault tolerance for a state store by enabling or disabling the change logging
of the store through <code class="docutils literal"><span class="pre">enableLogging()</span></code> and <code class="docutils literal"><span class="pre">disableLogging()</span></code>.
You can also fine-tune the associated topic&#8217;s configuration if needed.</p>
<p>Example for disabling fault-tolerance:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span>
<span class="n">StoreBuilder</span><span class="o">&lt;</span><span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">countStoreSupplier</span> <span class="o">=</span> <span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">&quot;Counts&quot;</span><span class="o">),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span>
<span class="o">.</span><span class="na">withLoggingDisabled</span><span class="o">();</span> <span class="c1">// disable backing up the store to a changelog topic</span></code></pre></div>
</div>
<div class="admonition attention">
<p class="first admonition-title">Attention</p>
<p class="last">If the changelog is disabled then the attached state store is no longer fault tolerant and it can&#8217;t have any <a class="reference internal" href="config-streams.html#streams-developer-guide-standby-replicas"><span class="std std-ref">standby replicas</span></a>.</p>
</div>
<p>Here is an example for enabling fault tolerance, with additional changelog-topic configuration:
You can add any log config from <a class="reference external" href="https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogConfig.scala">kafka.log.LogConfig</a>.
Unrecognized configs will be ignored.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span>
<span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">changelogConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o">();</span>
<span class="c1">// override min.insync.replicas</span>
<span class="n">changelogConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">TopicConfig</span><span class="o">.</span><span class="na">MIN_IN_SYNC_REPLICAS_CONFIG</span><span class="o">,</span> <span class="s">&quot;1&quot;</span><span class="o">)</span>
<span class="n">StoreBuilder</span><span class="o">&lt;</span><span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">countStoreSupplier</span> <span class="o">=</span> <span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">&quot;Counts&quot;</span><span class="o">),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span>
<span class="o">.</span><span class="na">withLoggingEnabled</span><span class="o">(</span><span class="n">changlogConfig</span><span class="o">);</span> <span class="c1">// enable changelogging, with custom changelog settings</span></code></pre></div>
</div>
</div>
<div class="section" id="timestamped-state-stores">
<span id="streams-developer-guide-state-store-timestamps"></span><h3><a class="toc-backref" href="#id11">Timestamped State Stores</a><a class="headerlink" href="#timestamped-state-stores" title="Permalink to this headline"></a></h3>
<p>KTables always store timestamps by default.
A timestamped state store improves stream processing semantics and enables
handling out-of-order data in source KTables, detecting out-of-order joins and aggregations,
and getting the timestamp of the latest update in an Interactive Query.</p>
<p>You can query timestamped state stores both with and without a timestamp.</p>
<b>Upgrade note:</b> All users upgrade with a single rolling bounce per instance.
<ul class="first simple">
<li>For Processor API users, nothing changes in existing applications, and you
have the option of using the timestamped stores.</li>
<li>For DSL operators, store data is upgraded lazily in the background.</li>
<li>No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in
by implementing the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html">TimestampedBytesStore</a>
interface. In this case, the old format is retained, and Streams uses a proxy store
that removes/adds timestamps on read/write.</li>
</ul>
</p>
</div>
<div class="section" id="implementing-custom-state-stores">
<span id="streams-developer-guide-state-store-custom"></span><h3><a class="toc-backref" href="#id7">Implementing Custom State Stores</a><a class="headerlink" href="#implementing-custom-state-stores" title="Permalink to this headline"></a></h3>
<p>You can use the <a class="reference internal" href="#streams-developer-guide-state-store-defining"><span class="std std-ref">built-in state store types</span></a> or implement your own.
The primary interface to implement for the store is
<code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStore</span></code>. Kafka Streams also has a few extended interfaces such
as <code class="docutils literal"><span class="pre">KeyValueStore</span></code>.</p>
<p>Note that your customized <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStore</span></code> implementation also needs to provide the logic on how to restore the state
via the <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateRestoreCallback</span></code> or <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.BatchingStateRestoreCallback</span></code> interface.
Details on how to instantiate these interfaces can be found in the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/StateStore.html">javadocs</a>.</p>
<p>You also need to provide a &#8220;builder&#8221; for the store by implementing the
<code class="docutils literal"><span class="pre">org.apache.kafka.streams.state.StoreBuilder</span></code> interface, which Kafka Streams uses to create instances of
your store.</p>
</div>
</div>
<div class="section" id="accessing-processor-context">
<h2><a class="toc-backref" href="#id10">Accessing Processor Context</a><a class="headerlink" href="#accessing-processor-context" title="Permalink to this headline"></a></h2>
<p>As we have mentioned in the <a href=#defining-a-stream-processor>Defining a Stream Processor</a> section, a <code>ProcessorContext</code> control the processing workflow, such as scheduling a punctuation function, and committing the current processed state.</p>
<p>This object can also be used to access the metadata related with the application like
<code class="docutils literal"><span class="pre">applicationId</span></code>, <code class="docutils literal"><span class="pre">taskId</span></code>,
and <code class="docutils literal"><span class="pre">stateDir</span></code>, and also record related metadata as <code class="docutils literal"><span class="pre">topic</span></code>,
<code class="docutils literal"><span class="pre">partition</span></code>, <code class="docutils literal"><span class="pre">offset</span></code>, <code class="docutils literal"><span class="pre">timestamp</span></code> and
<code class="docutils literal"><span class="pre">headers</span></code>.</p>
<p>Here is an example implementation of how to add a new header to the record:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">public void process(String key, String value) {</span>
<span class="c1">// add a header to the elements</span>
<span class="n">context()</span><span class="o">.</span><span class="na">headers</span><span class="o">()</span><span class="o">.</span><span class="na">add</span><span class="o">.</span><span class="o">(</span><span class="s">&quot;key&quot;</span><span class="o">,</span> <span class="s">&quot;key&quot;</span>
<span class="o">}</span></code></pre></div>
</div>
<div class="section" id="connecting-processors-and-state-stores">
<h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a><a class="headerlink" href="#connecting-processors-and-state-stores" title="Permalink to this headline"></a></h2>
<p>Now that a <a class="reference internal" href="#streams-developer-guide-stream-processor"><span class="std std-ref">processor</span></a> (WordCountProcessor) and the
state stores have been defined, you can construct the processor topology by connecting these processors and state stores together by
using the <code class="docutils literal"><span class="pre">Topology</span></code> instance. In addition, you can add source processors with the specified Kafka topics
to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate
output data streams out of the topology.</p>
<p>Here is an example implementation:</p>
<pre class="line-numbers"><code class="language-java"> Topology builder = new Topology();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
// add the WordCountProcessor node which takes the source processor as its upstream processor
.addProcessor("Process", () -> new WordCountProcessor(), "Source")
// add the count store associated with the WordCountProcessor processor
.addStateStore(countStoreBuilder, "Process")
// add the sink processor node that takes Kafka topic "sink-topic" as output
// and the WordCountProcessor node as its upstream processor
.addSink("Sink", "sink-topic", "Process");</code></pre>
<p>Here is a quick explanation of this example:</p>
<ul class="simple">
<li>A source processor node named <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> is added to the topology using the <code class="docutils literal"><span class="pre">addSource</span></code> method, with one Kafka topic
<code class="docutils literal"><span class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
<li>A processor node named <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> with the pre-defined <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> logic is then added as the downstream
processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node using the <code class="docutils literal"><span class="pre">addProcessor</span></code> method.</li>
<li>A predefined persistent key-value state store is created and associated with the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node, using
<code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li>
<li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node
as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code>
to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li>
</ul>
<p>In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor to the topology.
This can be done by implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
instead of calling <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code>, like this:
</p>
<pre class="line-numbers"><code class="language-java"> Topology builder = new Topology();
// add the source processor node that takes Kafka "source-topic" as input
builder.addSource("Source", "source-topic")
// add the WordCountProcessor node which takes the source processor as its upstream processor.
// the ProcessorSupplier provides the count store associated with the WordCountProcessor
.addProcessor("Process", new ProcessorSupplier&ltString, String&gt() {
public Processor&ltString, String&gt get() {
return new WordCountProcessor();
}
public Set&ltStoreBuilder&lt?&gt&gt stores() {
return countStoreBuilder;
}
}, "Source")
// add the sink processor node that takes Kafka topic "sink-topic" as output
// and the WordCountProcessor node as its upstream processor
.addSink("Sink", "sink-topic", "Process");</code></pre>
<p>This allows for a processor to "own" state stores, effectively encapsulating their usage from the user wiring the topology.
Multiple processors that share a state store may provide the same store with this technique, as long as the <code class="docutils literal"><span class="pre">StoreBuilder</span></code> is the same <code class="docutils literal"><span class="pre">instance</span></code>.</p>
<p>In these topologies, the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node, and an
upstream processor of the <code class="docutils literal"><span class="pre">&quot;Sink&quot;</span></code> node. As a result, whenever the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node forwards a newly fetched record from
Kafka to its downstream <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node, the <code class="docutils literal"><span class="pre">WordCountProcessor#process()</span></code> method is triggered to process the record and
update the associated state store. Whenever <code class="docutils literal"><span class="pre">context#forward()</span></code> is called in the
<code class="docutils literal"><span class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate key-value pair will be sent via the <code class="docutils literal"><span class="pre">&quot;Sink&quot;</span></code> processor node to
the Kafka topic <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code>. Note that in the <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> implementation, you must refer to the
same store name <code class="docutils literal"><span class="pre">&quot;Counts&quot;</span></code> when accessing the key-value store, otherwise an exception will be thrown at runtime,
indicating that the state store cannot be found. If the state store is not associated with the processor
in the <code class="docutils literal"><span class="pre">Topology</span></code> code, accessing it in the processor&#8217;s <code class="docutils literal"><span class="pre">init()</span></code> method will also throw an exception at
runtime, indicating the state store is not accessible from this processor.</p>
<p>Now that you have fully defined your processor topology in your application, you can proceed to
<a class="reference internal" href="running-app.html#streams-developer-guide-execution"><span class="std std-ref">running the Kafka Streams application</span></a>.</p>
</div>
</div>
</div>
</div>
<div class="pagination">
<a href="/{{version}}/documentation/streams/developer-guide/dsl-api" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
<!--#include virtual="../../../includes/_header.htm" -->
<!--#include virtual="../../../includes/_top.htm" -->
<div class="content documentation ">
<!--#include virtual="../../../includes/_nav.htm" -->
<div class="right">
<!--//#include virtual="../../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a></li>
<li><a href="/documentation/streams">Kafka Streams</a></li>
<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
</ul>
<div class="p-content"></div>
</div>
</div>
<!--#include virtual="../../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
//sticky secondary nav
var $navbar = $(".sub-nav-sticky"),
y_pos = $navbar.offset().top,
height = $navbar.height();
$(window).scroll(function() {
var scrollTop = $(window).scrollTop();
if (scrollTop > y_pos - height) {
$navbar.addClass("navbar-fixed")
} else if (scrollTop <= y_pos) {
$navbar.removeClass("navbar-fixed")
}
});
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>