| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <script><!--#include virtual="../../js/templateData.js" --></script> |
| |
| <script id="content-template" type="text/x-handlebars-template"> |
| <!-- h1>Developer Guide for Kafka Streams</h1 --> |
| <div class="sub-nav-sticky"> |
| <div class="sticky-top"> |
| <!-- div style="height:35px"> |
| <a href="/{{version}}/documentation/streams/">Introduction</a> |
| <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a> |
| <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a> |
| <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a> |
| <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a> |
| </div --> |
| </div> |
| </div> |
| |
| <div class="section" id="processor-api"> |
| <span id="streams-developer-guide-processor-api"></span><h1>Processor API<a class="headerlink" href="#processor-api" title="Permalink to this headline"></a></h1> |
| <p>The Processor API allows developers to define and connect custom processors and to interact with state stores. With the |
| Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these |
| processors with their associated state stores to compose the processor topology that represents a customized processing |
| logic.</p> |
| <div class="contents local topic" id="table-of-contents"> |
| <p class="topic-title first"><b>Table of Contents</b></p> |
| <ul class="simple"> |
| <li><a class="reference internal" href="#overview" id="id1">Overview</a></li> |
| <li><a class="reference internal" href="#defining-a-stream-processor" id="id2">Defining a Stream |
| Processor</a></li> |
| <li><a class="reference internal" href="#unit-testing-processors" id="id9">Unit Testing Processors</a></li> |
| <li><a class="reference internal" href="#state-stores" id="id3">State Stores</a> |
| <ul> |
| <li><a class="reference internal" href="#defining-and-creating-a-state-store" id="id4">Defining and creating a State Store</a></li> |
| <li><a class="reference internal" href="#fault-tolerant-state-stores" id="id5">Fault-tolerant State Stores</a></li> |
| <li><a class="reference internal" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" id="id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a></li> |
| <li><a class="reference internal" href="#timestamped-state-stores" id="id11">Timestamped State Stores</a></li> |
| <li><a class="reference internal" href="#implementing-custom-state-stores" id="id7">Implementing Custom State Stores</a></li> |
| </ul> |
| </li> |
| <li><a class="reference internal" href="#connecting-processors-and-state-stores" id="id8">Connecting Processors and State Stores</a></li> |
| <li><a class="reference internal" href="#accessing-processor-context" id="id10">Accessing Processor Context</a></li> |
| </ul> |
| </div> |
| <div class="section" id="overview"> |
| <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#overview" title="Permalink to this headline"></a></h2> |
| <p>The Processor API can be used to implement both <strong>stateless</strong> as well as <strong>stateful</strong> operations, where the latter is |
| achieved through the use of <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a>.</p> |
| <div class="admonition tip"> |
| <p><b>Tip</b></p> |
| <p class="last"><strong>Combining the DSL and the Processor API:</strong> |
| You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the |
| section <a class="reference internal" href="dsl-api.html#streams-developer-guide-dsl-process"><span class="std std-ref">Applying processors and transformers (Processor API integration)</span></a>.</p> |
| </div> |
| <p>For a complete list of available API functionality, see the <a href="/{{version}}/javadoc/org/apache/kafka/streams/package-summary.html">Streams</a> API docs.</p> |
| </div> |
| <div class="section" id="defining-a-stream-processor"> |
| <span id="streams-developer-guide-stream-processor"></span><h2><a class="toc-backref" href="#id2">Defining a Stream Processor</a><a class="headerlink" href="#defining-a-stream-processor" title="Permalink to this headline"></a></h2> |
| <p>A <a class="reference internal" href="../core-concepts.html#streams_processor_node"><span class="std std-ref">stream processor</span></a> is a node in the processor topology that represents a single processing step. |
| With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect |
| these processors with their associated state stores to compose the processor topology.</p> |
| <p>You can define a customized stream processor by implementing the <code class="docutils literal"><span class="pre">Processor</span></code> interface, which provides the <code class="docutils literal"><span class="pre">process()</span></code> API method. |
| The <code class="docutils literal"><span class="pre">process()</span></code> method is called on each of the received records.</p> |
| <p>The <code class="docutils literal"><span class="pre">Processor</span></code> interface also has an <code class="docutils literal"><span class="pre">init()</span></code> method, which is called by the Kafka Streams library during task construction |
| phase. Processor instances should perform any required initialization in this method. The <code class="docutils literal"><span class="pre">init()</span></code> method passes in a <code class="docutils literal"><span class="pre">ProcessorContext</span></code> |
| instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition, |
| its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation |
| function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record as a key-value pair to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>), |
| and to commit the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>). |
| Any resources you set up in <code class="docutils literal"><span class="pre">init()</span></code> can be cleaned up in the |
| <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single |
| <code class="docutils literal"><span class="pre">Processor</span></code> object by calling |
| <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p> |
| <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors: |
| (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp. |
| (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time). |
| Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p> |
| <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code> |
| API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used |
| for the punctuation scheduling: either <a class="reference internal" href="../core-concepts.html#streams_time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time |
| is configured to represent event-time via <code class="docutils literal"><span class="pre">TimestampExtractor</span></code>). When stream-time is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely |
| by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there |
| is no new input data arriving, stream-time is not advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> is not called.</p> |
| <p>For example, if you schedule a <code class="docutils literal"><span class="pre">Punctuator</span></code> function every 10 seconds based on <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> and if you |
| process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record), |
| then <code class="docutils literal"><span class="pre">punctuate()</span></code> would be called 6 times. This happens regardless of the time required to actually process those records. <code class="docutils literal"><span class="pre">punctuate()</span></code> |
| would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.</p> |
| <p>When wall-clock-time (i.e. <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>) is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely by the wall-clock time. |
| Reusing the example above, if the <code class="docutils literal"><span class="pre">Punctuator</span></code> function is scheduled based on <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>, and if these |
| 60 records were processed within 20 seconds, <code class="docutils literal"><span class="pre">punctuate()</span></code> is called 2 times (one time every 10 seconds). If these 60 records |
| were processed within 5 seconds, then no <code class="docutils literal"><span class="pre">punctuate()</span></code> is called at all. Note that you can schedule multiple <code class="docutils literal"><span class="pre">Punctuator</span></code> |
| callbacks with different <code class="docutils literal"><span class="pre">PunctuationType</span></code> types within the same processor by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> multiple |
| times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p> |
| <div class="admonition attention"> |
| <p class="first admonition-title"><b>Attention</b></p> |
| <p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available. |
| If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified. |
| This behavior is independent of the configured timestamp extractor, i.e., using <code class="docutils literal"><span class="pre">WallclockTimestampExtractor</span></code> does not enable wall-clock triggering of <code class="docutils literal"><span class="pre">punctuate()</span></code>.</p> |
| </div> |
| <p><b>Example</b></p> |
| <p>The following example <code class="docutils literal"><span class="pre">Processor</span></code> defines a simple word-count algorithm and the following actions are performed:</p> |
| <ul class="simple"> |
| <li>In the <code class="docutils literal"><span class="pre">init()</span></code> method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”.</li> |
| <li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li> |
| <li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li> |
| </ul> |
| <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCountProcessor</span> <span class="kd">implements</span> <span class="n">Processor</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="o">{</span> |
| |
| <span class="kd">private</span> <span class="n">ProcessorContext</span> <span class="n">context</span><span class="o">;</span> |
| <span class="kd">private</span> <span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">kvStore</span><span class="o">;</span> |
| |
| <span class="nd">@Override</span> |
| <span class="nd">@SuppressWarnings</span><span class="o">(</span><span class="s">"unchecked"</span><span class="o">)</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">ProcessorContext</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span> |
| <span class="c1">// keep the processor context locally because we need it in punctuate() and commit()</span> |
| <span class="k">this</span><span class="o">.</span><span class="na">context</span> <span class="o">=</span> <span class="n">context</span><span class="o">;</span> |
| |
| <span class="c1">// retrieve the key-value store named "Counts"</span> |
| <span class="n">kvStore</span> <span class="o">=</span> <span class="o">(</span><span class="n">KeyValueStore</span><span class="o">)</span> <span class="n">context</span><span class="o">.</span><span class="na">getStateStore</span><span class="o">(</span><span class="s">"Counts"</span><span class="o">);</span> |
| |
| <span class="c1">// schedule a punctuate() method every second based on stream-time</span> |
| <span class="k">this</span><span class="o">.</span><span class="na">context</span><span class="o">.</span><span class="na">schedule</span><span class="o">(</span><span class="na">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">1000</span><span class="o">),</span> <span class="n">PunctuationType</span><span class="o">.</span><span class="na">STREAM_TIME</span><span class="o">,</span> <span class="o">(</span><span class="n">timestamp</span><span class="o">)</span> <span class="o">-></span> <span class="o">{</span> |
| <span class="n">KeyValueIterator</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">iter</span> <span class="o">=</span> <span class="k">this</span><span class="o">.</span><span class="na">kvStore</span><span class="o">.</span><span class="na">all</span><span class="o">();</span> |
| <span class="k">while</span> <span class="o">(</span><span class="n">iter</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span> |
| <span class="n">KeyValue</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">entry</span> <span class="o">=</span> <span class="n">iter</span><span class="o">.</span><span class="na">next</span><span class="o">();</span> |
| <span class="n">context</span><span class="o">.</span><span class="na">forward</span><span class="o">(</span><span class="n">entry</span><span class="o">.</span><span class="na">key</span><span class="o">,</span> <span class="n">entry</span><span class="o">.</span><span class="na">value</span><span class="o">.</span><span class="na">toString</span><span class="o">());</span> |
| <span class="o">}</span> |
| <span class="n">iter</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> |
| |
| <span class="c1">// commit the current processing progress</span> |
| <span class="n">context</span><span class="o">.</span><span class="na">commit</span><span class="o">();</span> |
| <span class="o">});</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">punctuate</span><span class="o">(</span><span class="kt">long</span> <span class="n">timestamp</span><span class="o">)</span> <span class="o">{</span> |
| <span class="c1">// this method is deprecated and should not be used anymore</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">()</span> <span class="o">{</span> |
| <span class="c1">// close any resources managed by this processor</span> |
| <span class="c1">// Note: Do not close any StateStores as these are managed by the library</span> |
| <span class="o">}</span> |
| |
| <span class="o">}</span></code></pre></div> |
| </div> |
| <div class="admonition note"> |
| <p><b>Note</b></p> |
| <p class="last"><strong>Stateful processing with state stores:</strong> |
| The <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> defined above can access the currently received record in its <code class="docutils literal"><span class="pre">process()</span></code> method, and it can |
| leverage <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a> to maintain processing states to, for example, remember recently |
| arrived records for stateful processing needs like aggregations and joins. For more information, see the <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a> documentation.</p> |
| </div> |
| </div> |
| <div class="section" id="unit-testing-processors"> |
| <h2> |
| <a class="toc-backref" href="#id9">Unit Testing Processors</a> |
| <a class="headerlink" href="#unit-testing-processors" title="Permalink to this headline"></a> |
| </h2> |
| <p> |
| Kafka Streams comes with a <code>test-utils</code> module to help you write unit tests for your |
| processors <a href="testing.html#unit-testing-processors">here</a>. |
| </p> |
| </div> |
| <div class="section" id="state-stores"> |
| <span id="streams-developer-guide-state-store"></span><h2><a class="toc-backref" href="#id3">State Stores</a><a class="headerlink" href="#state-stores" title="Permalink to this headline"></a></h2> |
| <p>To implement a <strong>stateful</strong> <code class="docutils literal"><span class="pre">Processor</span></code> or <code class="docutils literal"><span class="pre">Transformer</span></code>, you must provide one or more state stores to the processor |
| or transformer (<em>stateless</em> processors or transformers do not need state stores). State stores can be used to remember |
| recently received input records, to track rolling aggregates, to de-duplicate input records, and more. |
| Another feature of state stores is that they can be |
| <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactively queried</span></a> from other applications, such as a |
| NodeJS-based dashboard or a microservice implemented in Scala or Go.</p> |
| <p>The |
| <a class="reference internal" href="#streams-developer-guide-state-store-defining"><span class="std std-ref">available state store types</span></a> in Kafka Streams have |
| <a class="reference internal" href="#streams-developer-guide-state-store-fault-tolerance"><span class="std std-ref">fault tolerance</span></a> enabled by default.</p> |
| <div class="section" id="defining-and-creating-a-state-store"> |
| <span id="streams-developer-guide-state-store-defining"></span><h3><a class="toc-backref" href="#id4">Defining and creating a State Store</a><a class="headerlink" href="#defining-and-creating-a-state-store" title="Permalink to this headline"></a></h3> |
| <p>You can either use one of the available store types or |
| <a class="reference internal" href="#streams-developer-guide-state-store-custom"><span class="std std-ref">implement your own custom store type</span></a>. |
| It’s common practice to leverage an existing store type via the <code class="docutils literal"><span class="pre">Stores</span></code> factory.</p> |
| <p>Note that, when using Kafka Streams, you normally don’t create or instantiate state stores directly in your code. |
| Rather, you define state stores indirectly by creating a so-called <code class="docutils literal"><span class="pre">StoreBuilder</span></code>. This builder is used by |
| Kafka Streams as a factory to instantiate the actual state stores locally in application instances when and where |
| needed.</p> |
| <p>The following store types are available out of the box.</p> |
| <table border="1" class="non-scrolling-table width-100-percent docutils"> |
| <colgroup> |
| <col width="19%" /> |
| <col width="11%" /> |
| <col width="18%" /> |
| <col width="51%" /> |
| </colgroup> |
| <thead valign="bottom"> |
| <tr class="row-odd"><th class="head">Store Type</th> |
| <th class="head">Storage Engine</th> |
| <th class="head">Fault-tolerant?</th> |
| <th class="head">Description</th> |
| </tr> |
| </thead> |
| <tbody valign="top"> |
| <tr class="row-even"><td>Persistent |
| <code class="docutils literal"><span class="pre">KeyValueStore<K,</span> <span class="pre">V></span></code></td> |
| <td>RocksDB</td> |
| <td>Yes (enabled by default)</td> |
| <td><ul class="first simple"> |
| <li><strong>The recommended store type for most use cases.</strong></li> |
| <li>Stores its data on local disk.</li> |
| <li>Storage capacity: |
| managed local state can be larger than the memory (heap space) of an |
| application instance, but must fit into the available local disk |
| space.</li> |
| <li>RocksDB settings can be fine-tuned, see |
| <a class="reference internal" href="config-streams.html#streams-developer-guide-rocksdb-config"><span class="std std-ref">RocksDB configuration</span></a>.</li> |
| <li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore-java.lang.String-">store variants</a>: |
| time window key-value store, session window key-value store.</li> |
| <li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore-java.lang.String-">persistentTimestampedKeyValueStore</a> |
| when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries.</li> |
| <li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore-java.lang.String-java.time.Duration-java.time.Duration-boolean-">persistentTimestampedWindowStore</a> |
| when you need a persistent windowedKey-(value/timestamp) store.</li> |
| </ul> |
| <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Creating a persistent key-value store:</span> |
| <span class="c1">// here, we create a `KeyValueStore<String, Long>` named "persistent-counts".</span> |
| <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span> |
| |
| <span class="c1">// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.</span> |
| <span class="n">StoreBuilder</span><span class="o"><</span><span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">>></span> <span class="n">countStoreSupplier</span> <span class="o">=</span> |
| <span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span> |
| <span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">"persistent-counts"</span><span class="o">),</span> |
| <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> |
| <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span> |
| <span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">countStore</span> <span class="o">=</span> <span class="n">countStoreSupplier</span><span class="o">.</span><span class="na">build</span><span class="o">();</span></code></pre></div> |
| </div> |
| </td> |
| </tr> |
| <tr class="row-odd"><td>In-memory |
| <code class="docutils literal"><span class="pre">KeyValueStore<K,</span> <span class="pre">V></span></code></td> |
| <td>-</td> |
| <td>Yes (enabled by default)</td> |
| <td><ul class="first simple"> |
| <li>Stores its data in memory.</li> |
| <li>Storage capacity: |
| managed local state must fit into memory (heap space) of an |
| application instance.</li> |
| <li>Useful when application instances run in an environment where local |
| disk space is either not available or local disk space is wiped |
| in-between app instance restarts.</li> |
| <li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-">store variants</a>: |
| time window key-value store, session window key-value store.</li> |
| <li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html">TimestampedKeyValueStore</a> |
| when you need a key-(value/timestamp) store that supports put/get/delete and range queries.</li> |
| <li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html">TimestampedWindowStore</a> |
| when you need to store windowedKey-(value/timestamp) pairs.</li> |
| </ul> |
| <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Creating an in-memory key-value store:</span> |
| <span class="c1">// here, we create a `KeyValueStore<String, Long>` named "inmemory-counts".</span> |
| <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span> |
| |
| <span class="c1">// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.</span> |
| <span class="n">StoreBuilder</span><span class="o"><</span><span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">>></span> <span class="n">countStoreSupplier</span> <span class="o">=</span> |
| <span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span> |
| <span class="n">Stores</span><span class="o">.</span><span class="na">inMemoryKeyValueStore</span><span class="o">(</span><span class="s">"inmemory-counts"</span><span class="o">),</span> |
| <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> |
| <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span> |
| <span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">countStore</span> <span class="o">=</span> <span class="n">countStoreSupplier</span><span class="o">.</span><span class="na">build</span><span class="o">();</span></code></pre></div> |
| </div> |
| </td> |
| </tr> |
| </tbody> |
| </table> |
| </div> |
| <div class="section" id="fault-tolerant-state-stores"> |
| <span id="streams-developer-guide-state-store-fault-tolerance"></span><h3><a class="toc-backref" href="#id5">Fault-tolerant State Stores</a><a class="headerlink" href="#fault-tolerant-state-stores" title="Permalink to this headline"></a></h3> |
| <p>To make state stores fault-tolerant and to allow for state store migration without data loss, a state store can be |
| continuously backed up to a Kafka topic behind the scenes. For example, to migrate a stateful stream task from one |
| machine to another when <a class="reference internal" href="running-app.html#streams-developer-guide-execution-scaling"><span class="std std-ref">elastically adding or removing capacity from your application</span></a>. |
| This topic is sometimes referred to as the state store’s associated <em>changelog topic</em>, or its <em>changelog</em>. For example, if |
| you experience machine failure, the state store and the application’s state can be fully restored from its changelog. You can |
| <a class="reference internal" href="#streams-developer-guide-state-store-enable-disable-fault-tolerance"><span class="std std-ref">enable or disable this backup feature</span></a> for a |
| state store.</p> |
| <p>Fault-tolerant state stores are backed by a |
| <a class="reference external" href="https://kafka.apache.org/documentation.html#compaction">compacted</a> changelog topic. The purpose of compacting this |
| topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster, |
| and to minimize recovery time if a state store needs to be restored from its changelog topic.</p> |
| <p>Fault-tolerant windowed state stores are backed by a topic that uses both compaction and |
| deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of |
| deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are |
| composite keys that include the “normal” key and window timestamps. For these types of composite keys it would not |
| be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion |
| enabled, old windows that have expired will be cleaned up by Kafka’s log cleaner as the log segments expire. The |
| default retention setting is <code class="docutils literal"><span class="pre">Windows#maintainMs()</span></code> + 1 day. You can override this setting by specifying |
| <code class="docutils literal"><span class="pre">StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG</span></code> in the <code class="docutils literal"><span class="pre">StreamsConfig</span></code>.</p> |
| <p>When you open an <code class="docutils literal"><span class="pre">Iterator</span></code> from a state store you must call <code class="docutils literal"><span class="pre">close()</span></code> on the iterator when you are done working with |
| it to reclaim resources; or you can use the iterator from within a try-with-resources statement. If you do not close an iterator, |
| you may encounter an OOM error.</p> |
| </div> |
| <div class="section" id="enable-or-disable-fault-tolerance-of-state-stores-store-changelogs"> |
| <span id="streams-developer-guide-state-store-enable-disable-fault-tolerance"></span><h3><a class="toc-backref" href="#id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a><a class="headerlink" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" title="Permalink to this headline"></a></h3> |
| <p>You can enable or disable fault tolerance for a state store by enabling or disabling the change logging |
| of the store through <code class="docutils literal"><span class="pre">enableLogging()</span></code> and <code class="docutils literal"><span class="pre">disableLogging()</span></code>. |
| You can also fine-tune the associated topic’s configuration if needed.</p> |
| <p>Example for disabling fault-tolerance:</p> |
| <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span> |
| |
| <span class="n">StoreBuilder</span><span class="o"><</span><span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">>></span> <span class="n">countStoreSupplier</span> <span class="o">=</span> <span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span> |
| <span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">"Counts"</span><span class="o">),</span> |
| <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> |
| <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span> |
| <span class="o">.</span><span class="na">withLoggingDisabled</span><span class="o">();</span> <span class="c1">// disable backing up the store to a changelog topic</span></code></pre></div> |
| </div> |
| <div class="admonition attention"> |
| <p class="first admonition-title">Attention</p> |
| <p class="last">If the changelog is disabled then the attached state store is no longer fault tolerant and it can’t have any <a class="reference internal" href="config-streams.html#streams-developer-guide-standby-replicas"><span class="std std-ref">standby replicas</span></a>.</p> |
| </div> |
| <p>Here is an example for enabling fault tolerance, with additional changelog-topic configuration: |
| You can add any log config from <a class="reference external" href="https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogConfig.scala">kafka.log.LogConfig</a>. |
| Unrecognized configs will be ignored.</p> |
| <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span> |
| |
| <span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">changelogConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o">();</span> |
| <span class="c1">// override min.insync.replicas</span> |
| <span class="n">changelogConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">TopicConfig</span><span class="o">.</span><span class="na">MIN_IN_SYNC_REPLICAS_CONFIG</span><span class="o">,</span> <span class="s">"1"</span><span class="o">)</span> |
| |
| <span class="n">StoreBuilder</span><span class="o"><</span><span class="n">KeyValueStore</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">>></span> <span class="n">countStoreSupplier</span> <span class="o">=</span> <span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span> |
| <span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">"Counts"</span><span class="o">),</span> |
| <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> |
| <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span> |
| <span class="o">.</span><span class="na">withLoggingEnabled</span><span class="o">(</span><span class="n">changlogConfig</span><span class="o">);</span> <span class="c1">// enable changelogging, with custom changelog settings</span></code></pre></div> |
| </div> |
| </div> |
| <div class="section" id="timestamped-state-stores"> |
| <span id="streams-developer-guide-state-store-timestamps"></span><h3><a class="toc-backref" href="#id11">Timestamped State Stores</a><a class="headerlink" href="#timestamped-state-stores" title="Permalink to this headline"></a></h3> |
| <p>KTables always store timestamps by default. |
| A timestamped state store improves stream processing semantics and enables |
| handling out-of-order data in source KTables, detecting out-of-order joins and aggregations, |
| and getting the timestamp of the latest update in an Interactive Query.</p> |
| <p>You can query timestamped state stores both with and without a timestamp.</p> |
| <b>Upgrade note:</b> All users upgrade with a single rolling bounce per instance. |
| <ul class="first simple"> |
| <li>For Processor API users, nothing changes in existing applications, and you |
| have the option of using the timestamped stores.</li> |
| <li>For DSL operators, store data is upgraded lazily in the background.</li> |
| <li>No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in |
| by implementing the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html">TimestampedBytesStore</a> |
| interface. In this case, the old format is retained, and Streams uses a proxy store |
| that removes/adds timestamps on read/write.</li> |
| </ul> |
| </p> |
| </div> |
| <div class="section" id="implementing-custom-state-stores"> |
| <span id="streams-developer-guide-state-store-custom"></span><h3><a class="toc-backref" href="#id7">Implementing Custom State Stores</a><a class="headerlink" href="#implementing-custom-state-stores" title="Permalink to this headline"></a></h3> |
| <p>You can use the <a class="reference internal" href="#streams-developer-guide-state-store-defining"><span class="std std-ref">built-in state store types</span></a> or implement your own. |
| The primary interface to implement for the store is |
| <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStore</span></code>. Kafka Streams also has a few extended interfaces such |
| as <code class="docutils literal"><span class="pre">KeyValueStore</span></code>.</p> |
| <p>Note that your customized <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStore</span></code> implementation also needs to provide the logic on how to restore the state |
| via the <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateRestoreCallback</span></code> or <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.BatchingStateRestoreCallback</span></code> interface. |
| Details on how to instantiate these interfaces can be found in the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/StateStore.html">javadocs</a>.</p> |
| <p>You also need to provide a “builder” for the store by implementing the |
| <code class="docutils literal"><span class="pre">org.apache.kafka.streams.state.StoreBuilder</span></code> interface, which Kafka Streams uses to create instances of |
| your store.</p> |
| </div> |
| </div> |
| <div class="section" id="accessing-processor-context"> |
| <h2><a class="toc-backref" href="#id10">Accessing Processor Context</a><a class="headerlink" href="#accessing-processor-context" title="Permalink to this headline"></a></h2> |
| <p>As we have mentioned in the <a href=#defining-a-stream-processor>Defining a Stream Processor</a> section, a <code>ProcessorContext</code> control the processing workflow, such as scheduling a punctuation function, and committing the current processed state.</p> |
| <p>This object can also be used to access the metadata related with the application like |
| <code class="docutils literal"><span class="pre">applicationId</span></code>, <code class="docutils literal"><span class="pre">taskId</span></code>, |
| and <code class="docutils literal"><span class="pre">stateDir</span></code>, and also record related metadata as <code class="docutils literal"><span class="pre">topic</span></code>, |
| <code class="docutils literal"><span class="pre">partition</span></code>, <code class="docutils literal"><span class="pre">offset</span></code>, <code class="docutils literal"><span class="pre">timestamp</span></code> and |
| <code class="docutils literal"><span class="pre">headers</span></code>.</p> |
| <p>Here is an example implementation of how to add a new header to the record:</p> |
| <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">public void process(String key, String value) {</span> |
| |
| <span class="c1">// add a header to the elements</span> |
| <span class="n">context()</span><span class="o">.</span><span class="na">headers</span><span class="o">()</span><span class="o">.</span><span class="na">add</span><span class="o">.</span><span class="o">(</span><span class="s">"key"</span><span class="o">,</span> <span class="s">"key"</span> |
| <span class="o">}</span></code></pre></div> |
| </div> |
| <div class="section" id="connecting-processors-and-state-stores"> |
| <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a><a class="headerlink" href="#connecting-processors-and-state-stores" title="Permalink to this headline"></a></h2> |
| <p>Now that a <a class="reference internal" href="#streams-developer-guide-stream-processor"><span class="std std-ref">processor</span></a> (WordCountProcessor) and the |
| state stores have been defined, you can construct the processor topology by connecting these processors and state stores together by |
| using the <code class="docutils literal"><span class="pre">Topology</span></code> instance. In addition, you can add source processors with the specified Kafka topics |
| to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate |
| output data streams out of the topology.</p> |
| <p>Here is an example implementation:</p> |
| <pre class="line-numbers"><code class="language-java"> Topology builder = new Topology(); |
| // add the source processor node that takes Kafka topic "source-topic" as input |
| builder.addSource("Source", "source-topic") |
| // add the WordCountProcessor node which takes the source processor as its upstream processor |
| .addProcessor("Process", () -> new WordCountProcessor(), "Source") |
| // add the count store associated with the WordCountProcessor processor |
| .addStateStore(countStoreBuilder, "Process") |
| // add the sink processor node that takes Kafka topic "sink-topic" as output |
| // and the WordCountProcessor node as its upstream processor |
| .addSink("Sink", "sink-topic", "Process");</code></pre> |
| <p>Here is a quick explanation of this example:</p> |
| <ul class="simple"> |
| <li>A source processor node named <code class="docutils literal"><span class="pre">"Source"</span></code> is added to the topology using the <code class="docutils literal"><span class="pre">addSource</span></code> method, with one Kafka topic |
| <code class="docutils literal"><span class="pre">"source-topic"</span></code> fed to it.</li> |
| <li>A processor node named <code class="docutils literal"><span class="pre">"Process"</span></code> with the pre-defined <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> logic is then added as the downstream |
| processor of the <code class="docutils literal"><span class="pre">"Source"</span></code> node using the <code class="docutils literal"><span class="pre">addProcessor</span></code> method.</li> |
| <li>A predefined persistent key-value state store is created and associated with the <code class="docutils literal"><span class="pre">"Process"</span></code> node, using |
| <code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li> |
| <li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">"Process"</span></code> node |
| as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">"sink-topic"</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code> |
| to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li> |
| </ul> |
| <p>In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor to the topology. |
| This can be done by implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> |
| instead of calling <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code>, like this: |
| </p> |
| <pre class="line-numbers"><code class="language-java"> Topology builder = new Topology(); |
| // add the source processor node that takes Kafka "source-topic" as input |
| builder.addSource("Source", "source-topic") |
| // add the WordCountProcessor node which takes the source processor as its upstream processor. |
| // the ProcessorSupplier provides the count store associated with the WordCountProcessor |
| .addProcessor("Process", new ProcessorSupplier<String, String>() { |
| public Processor<String, String> get() { |
| return new WordCountProcessor(); |
| } |
| public Set<StoreBuilder<?>> stores() { |
| return countStoreBuilder; |
| } |
| }, "Source") |
| // add the sink processor node that takes Kafka topic "sink-topic" as output |
| // and the WordCountProcessor node as its upstream processor |
| .addSink("Sink", "sink-topic", "Process");</code></pre> |
| <p>This allows for a processor to "own" state stores, effectively encapsulating their usage from the user wiring the topology. |
| Multiple processors that share a state store may provide the same store with this technique, as long as the <code class="docutils literal"><span class="pre">StoreBuilder</span></code> is the same <code class="docutils literal"><span class="pre">instance</span></code>.</p> |
| <p>In these topologies, the <code class="docutils literal"><span class="pre">"Process"</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">"Source"</span></code> node, and an |
| upstream processor of the <code class="docutils literal"><span class="pre">"Sink"</span></code> node. As a result, whenever the <code class="docutils literal"><span class="pre">"Source"</span></code> node forwards a newly fetched record from |
| Kafka to its downstream <code class="docutils literal"><span class="pre">"Process"</span></code> node, the <code class="docutils literal"><span class="pre">WordCountProcessor#process()</span></code> method is triggered to process the record and |
| update the associated state store. Whenever <code class="docutils literal"><span class="pre">context#forward()</span></code> is called in the |
| <code class="docutils literal"><span class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate key-value pair will be sent via the <code class="docutils literal"><span class="pre">"Sink"</span></code> processor node to |
| the Kafka topic <code class="docutils literal"><span class="pre">"sink-topic"</span></code>. Note that in the <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> implementation, you must refer to the |
| same store name <code class="docutils literal"><span class="pre">"Counts"</span></code> when accessing the key-value store, otherwise an exception will be thrown at runtime, |
| indicating that the state store cannot be found. If the state store is not associated with the processor |
| in the <code class="docutils literal"><span class="pre">Topology</span></code> code, accessing it in the processor’s <code class="docutils literal"><span class="pre">init()</span></code> method will also throw an exception at |
| runtime, indicating the state store is not accessible from this processor.</p> |
| <p>Now that you have fully defined your processor topology in your application, you can proceed to |
| <a class="reference internal" href="running-app.html#streams-developer-guide-execution"><span class="std std-ref">running the Kafka Streams application</span></a>.</p> |
| </div> |
| </div> |
| |
| |
| </div> |
| </div> |
| <div class="pagination"> |
| <a href="/{{version}}/documentation/streams/developer-guide/dsl-api" class="pagination__btn pagination__btn__prev">Previous</a> |
| <a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__next">Next</a> |
| </div> |
| </script> |
| |
| <!--#include virtual="../../../includes/_header.htm" --> |
| <!--#include virtual="../../../includes/_top.htm" --> |
| <div class="content documentation "> |
| <!--#include virtual="../../../includes/_nav.htm" --> |
| <div class="right"> |
| <!--#include virtual="../../../includes/_docs_banner.htm" --> |
| <ul class="breadcrumbs"> |
| <li><a href="/documentation">Documentation</a></li> |
| <li><a href="/documentation/streams">Kafka Streams</a></li> |
| <li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li> |
| </ul> |
| <div class="p-content"></div> |
| </div> |
| </div> |
| <!--#include virtual="../../../includes/_footer.htm" --> |
| <script> |
| $(function() { |
| // Show selected style on nav item |
| $('.b-nav__streams').addClass('selected'); |
| |
| //sticky secondary nav |
| var $navbar = $(".sub-nav-sticky"), |
| y_pos = $navbar.offset().top, |
| height = $navbar.height(); |
| |
| $(window).scroll(function() { |
| var scrollTop = $(window).scrollTop(); |
| |
| if (scrollTop > y_pos - height) { |
| $navbar.addClass("navbar-fixed") |
| } else if (scrollTop <= y_pos) { |
| $navbar.removeClass("navbar-fixed") |
| } |
| }); |
| |
| // Display docs subnav items |
| $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded'); |
| }); |
| </script> |