| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <script><!--#include virtual="../../js/templateData.js" --></script> |
| |
| <script id="content-template" type="text/x-handlebars-template"> |
| <!-- h1>Developer Guide for Kafka Streams</h1 --> |
| <div class="sub-nav-sticky"> |
| <div class="sticky-top"> |
| <!-- div style="height:35px"> |
| <a href="/{{version}}/documentation/streams/">Introduction</a> |
| <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a> |
| <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a> |
| <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a> |
| <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a> |
| </div --> |
| </div> |
| </div> |
| |
| <div class="section" id="processor-api"> |
| <span id="streams-developer-guide-processor-api"></span><h1>Processor API<a class="headerlink" href="#processor-api" title="Permalink to this headline"></a></h1> |
| <p>The Processor API allows developers to define and connect custom processors and to interact with state stores. With the |
| Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these |
| processors with their associated state stores to compose the processor topology that represents a customized processing |
| logic.</p> |
| <div class="contents local topic" id="table-of-contents"> |
| <p class="topic-title first"><b>Table of Contents</b></p> |
| <ul class="simple"> |
| <li><a class="reference internal" href="#overview" id="id1">Overview</a></li> |
| <li><a class="reference internal" href="#defining-a-stream-processor" id="id2">Defining a Stream |
| Processor</a></li> |
| <li><a class="reference internal" href="#unit-testing-processors" id="id9">Unit Testing Processors</a></li> |
| <li><a class="reference internal" href="#state-stores" id="id3">State Stores</a> |
| <ul> |
| <li><a class="reference internal" href="#defining-and-creating-a-state-store" id="id4">Defining and creating a State Store</a></li> |
| <li><a class="reference internal" href="#fault-tolerant-state-stores" id="id5">Fault-tolerant State Stores</a></li> |
| <li><a class="reference internal" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" id="id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a></li> |
| <li><a class="reference internal" href="#timestamped-state-stores" id="id11">Timestamped State Stores</a></li> |
| <li><a class="reference internal" href="#versioned-state-stores" id="id12">Versioned Key-Value State Stores</a></li> |
| <li><a class="reference internal" href="#implementing-custom-state-stores" id="id7">Implementing Custom State Stores</a></li> |
| </ul> |
| </li> |
| <li><a class="reference internal" href="#connecting-processors-and-state-stores" id="id8">Connecting Processors and State Stores</a></li> |
| <li><a class="reference internal" href="#accessing-processor-context" id="id10">Accessing Processor Context</a></li> |
| </ul> |
| </div> |
| <div class="section" id="overview"> |
| <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#overview" title="Permalink to this headline"></a></h2> |
| <p>The Processor API can be used to implement both <strong>stateless</strong> as well as <strong>stateful</strong> operations, where the latter is |
| achieved through the use of <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a>.</p> |
| <div class="admonition tip"> |
| <p><b>Tip</b></p> |
| <p class="last"><strong>Combining the DSL and the Processor API:</strong> |
| You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the |
| section <a class="reference internal" href="dsl-api.html#streams-developer-guide-dsl-process"><span class="std std-ref">Applying processors and transformers (Processor API integration)</span></a>.</p> |
| </div> |
| <p>For a complete list of available API functionality, see the <a href="/{{version}}/javadoc/org/apache/kafka/streams/package-summary.html">Streams</a> API docs.</p> |
| </div> |
| <div class="section" id="defining-a-stream-processor"> |
| <span id="streams-developer-guide-stream-processor"></span><h2><a class="toc-backref" href="#id2">Defining a Stream Processor</a><a class="headerlink" href="#defining-a-stream-processor" title="Permalink to this headline"></a></h2> |
| <p>A <a class="reference internal" href="../core-concepts.html#streams_processor_node"><span class="std std-ref">stream processor</span></a> is a node in the processor topology that represents a single processing step. |
| With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect |
| these processors with their associated state stores to compose the processor topology.</p> |
| <p>You can define a customized stream processor by implementing the <code class="docutils literal"><span class="pre">Processor</span></code> interface, which provides the <code class="docutils literal"><span class="pre">process()</span></code> API method. |
| The <code class="docutils literal"><span class="pre">process()</span></code> method is called on each of the received records.</p> |
| <p>The <code class="docutils literal"><span class="pre">Processor</span></code> interface also has an <code class="docutils literal"><span class="pre">init()</span></code> method, which is called by the Kafka Streams library during task construction |
| phase. Processor instances should perform any required initialization in this method. The <code class="docutils literal"><span class="pre">init()</span></code> method passes in a <code class="docutils literal"><span class="pre">ProcessorContext</span></code> |
| instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition, |
| its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation |
| function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record as a key-value pair to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>), |
| and to commit the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>). |
| Any resources you set up in <code class="docutils literal"><span class="pre">init()</span></code> can be cleaned up in the |
| <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single |
| <code class="docutils literal"><span class="pre">Processor</span></code> object by calling |
| <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p> |
| <p> |
| The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters: |
| <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types |
| that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and |
| <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed |
| to <code class="docutils literal"><span class="pre">process()</span></code>. |
| Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code> |
| define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code> |
| will accept. If your processor does not forward any records at all (or if it only forwards |
| <code class="docutils literal"><span class="pre">null</span></code> keys or values), |
| a best practice is to set the output generic type argument to |
| <code class="docutils literal"><span class="pre">Void</span></code>. |
| If it needs to forward multiple types that don't share a common superclass, you will |
| have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>. |
| </p> |
| <p> |
| Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code> |
| and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code> |
| methods handle records in the form of the <code class="docutils literal"><span class="pre">Record<K, V></span></code> |
| data class. This class gives you access to the main components of a Kafka record: |
| the key, value, timestamp and headers. When forwarding records, you can use the |
| constructor to create a new <code class="docutils literal"><span class="pre">Record</span></code> |
| from scratch, or you can use the convenience builder methods to replace one of the |
| <code class="docutils literal"><span class="pre">Record</span></code>'s properties |
| and copy over the rest. For example, |
| <code class="docutils literal"><span class="pre">inputRecord.withValue(newValue)</span></code> |
| would copy the key, timestamp, and headers from |
| <code class="docutils literal"><span class="pre">inputRecord</span></code> while |
| setting the output record's value to <code class="docutils literal"><span class="pre">newValue</span></code>. |
| Note that this does not mutate <code class="docutils literal"><span class="pre">inputRecord</span></code>, |
| but instead creates a shallow copy. Beware that this is only a shallow copy, so if you |
| plan to mutate the key, value, or headers elsewhere in the program, you will want to |
| create a deep copy of those fields yourself. |
| </p> |
| <p> |
| In addition to handling incoming records via |
| <code class="docutils literal"><span class="pre">Processor#process()</span></code>, |
| you have the option to schedule periodic invocation (called "punctuation") |
| in your processor's <code class="docutils literal"><span class="pre">init()</span></code> |
| method by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> |
| and passing it a <code class="docutils literal"><span class="pre">Punctuator</span></code>. |
| The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used |
| for the punctuation scheduling: either <a class="reference internal" href="../core-concepts.html#streams_time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time |
| is configured to represent event-time via <code class="docutils literal"><span class="pre">TimestampExtractor</span></code>). When stream-time is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely |
| by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there |
| is no new input data arriving, stream-time is not advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> is not called.</p> |
| <p>For example, if you schedule a <code class="docutils literal"><span class="pre">Punctuator</span></code> function every 10 seconds based on <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> and if you |
| process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record), |
| then <code class="docutils literal"><span class="pre">punctuate()</span></code> would be called 6 times. This happens regardless of the time required to actually process those records. <code class="docutils literal"><span class="pre">punctuate()</span></code> |
| would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.</p> |
| <p>When wall-clock-time (i.e. <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>) is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely by the wall-clock time. |
| Reusing the example above, if the <code class="docutils literal"><span class="pre">Punctuator</span></code> function is scheduled based on <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>, and if these |
| 60 records were processed within 20 seconds, <code class="docutils literal"><span class="pre">punctuate()</span></code> is called 2 times (one time every 10 seconds). If these 60 records |
| were processed within 5 seconds, then no <code class="docutils literal"><span class="pre">punctuate()</span></code> is called at all. Note that you can schedule multiple <code class="docutils literal"><span class="pre">Punctuator</span></code> |
| callbacks with different <code class="docutils literal"><span class="pre">PunctuationType</span></code> types within the same processor by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> multiple |
| times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p> |
| <div class="admonition attention"> |
| <p class="first admonition-title"><b>Attention</b></p> |
| <p class="last">Stream-time is only advanced when Streams processes records. |
| If there are no records to process, or if Streams is waiting for new records |
| due to the <a class="reference internal" href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a> |
| configuration, then the stream time will not advance and <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified. |
| This behavior is independent of the configured timestamp extractor, i.e., using <code class="docutils literal"><span class="pre">WallclockTimestampExtractor</span></code> does not enable wall-clock triggering of <code class="docutils literal"><span class="pre">punctuate()</span></code>.</p> |
| </div> |
| <p><b>Example</b></p> |
| <p>The following example <code class="docutils literal"><span class="pre">Processor</span></code> defines a simple word-count algorithm and the following actions are performed:</p> |
| <ul class="simple"> |
| <li>In the <code class="docutils literal"><span class="pre">init()</span></code> method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”.</li> |
| <li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li> |
| <li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li> |
| </ul> |
| <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor<String, String, String, String> { |
| private KeyValueStore<String, Integer> kvStore; |
| |
| @Override |
| public void init(final ProcessorContext<String, String> context) { |
| context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> { |
| try (final KeyValueIterator<String, Integer> iter = kvStore.all()) { |
| while (iter.hasNext()) { |
| final KeyValue<String, Integer> entry = iter.next(); |
| context.forward(new Record<>(entry.key, entry.value.toString(), timestamp)); |
| } |
| } |
| }); |
| kvStore = context.getStateStore("Counts"); |
| } |
| |
| @Override |
| public void process(final Record<String, String> record) { |
| final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+"); |
| |
| for (final String word : words) { |
| final Integer oldValue = kvStore.get(word); |
| |
| if (oldValue == null) { |
| kvStore.put(word, 1); |
| } else { |
| kvStore.put(word, oldValue + 1); |
| } |
| } |
| } |
| |
| @Override |
| public void close() { |
| // close any resources managed by this processor |
| // Note: Do not close any StateStores as these are managed by the library |
| } |
| }</code></pre> |
| <div class="admonition note"> |
| <p><b>Note</b></p> |
| <p class="last"><strong>Stateful processing with state stores:</strong> |
| The <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> defined above can access the currently received record in its <code class="docutils literal"><span class="pre">process()</span></code> method, and it can |
| leverage <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a> to maintain processing states to, for example, remember recently |
| arrived records for stateful processing needs like aggregations and joins. For more information, see the <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a> documentation.</p> |
| </div> |
| </div> |
| <div class="section" id="unit-testing-processors"> |
| <h2> |
| <a class="toc-backref" href="#id9">Unit Testing Processors</a> |
| <a class="headerlink" href="#unit-testing-processors" title="Permalink to this headline"></a> |
| </h2> |
| <p> |
| Kafka Streams comes with a <code>test-utils</code> module to help you write unit tests for your |
| processors <a href="testing.html#unit-testing-processors">here</a>. |
| </p> |
| </div> |
| <div class="section" id="state-stores"> |
| <span id="streams-developer-guide-state-store"></span><h2><a class="toc-backref" href="#id3">State Stores</a><a class="headerlink" href="#state-stores" title="Permalink to this headline"></a></h2> |
| <p>To implement a <strong>stateful</strong> <code class="docutils literal"><span class="pre">Processor</span></code> or <code class="docutils literal"><span class="pre">Transformer</span></code>, you must provide one or more state stores to the processor |
| or transformer (<em>stateless</em> processors or transformers do not need state stores). State stores can be used to remember |
| recently received input records, to track rolling aggregates, to de-duplicate input records, and more. |
| Another feature of state stores is that they can be |
| <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactively queried</span></a> from other applications, such as a |
| NodeJS-based dashboard or a microservice implemented in Scala or Go.</p> |
| <p>The |
| <a class="reference internal" href="#streams-developer-guide-state-store-defining"><span class="std std-ref">available state store types</span></a> in Kafka Streams have |
| <a class="reference internal" href="#streams-developer-guide-state-store-fault-tolerance"><span class="std std-ref">fault tolerance</span></a> enabled by default.</p> |
| <div class="section" id="defining-and-creating-a-state-store"> |
| <span id="streams-developer-guide-state-store-defining"></span><h3><a class="toc-backref" href="#id4">Defining and creating a State Store</a><a class="headerlink" href="#defining-and-creating-a-state-store" title="Permalink to this headline"></a></h3> |
| <p>You can either use one of the available store types or |
| <a class="reference internal" href="#streams-developer-guide-state-store-custom"><span class="std std-ref">implement your own custom store type</span></a>. |
| It’s common practice to leverage an existing store type via the <code class="docutils literal"><span class="pre">Stores</span></code> factory.</p> |
| <p>Note that, when using Kafka Streams, you normally don’t create or instantiate state stores directly in your code. |
| Rather, you define state stores indirectly by creating a so-called <code class="docutils literal"><span class="pre">StoreBuilder</span></code>. This builder is used by |
| Kafka Streams as a factory to instantiate the actual state stores locally in application instances when and where |
| needed.</p> |
| <p>The following store types are available out of the box.</p> |
| <table border="1" class="non-scrolling-table width-100-percent docutils"> |
| <colgroup> |
| <col width="19%" /> |
| <col width="11%" /> |
| <col width="18%" /> |
| <col width="51%" /> |
| </colgroup> |
| <thead valign="bottom"> |
| <tr class="row-odd"><th class="head">Store Type</th> |
| <th class="head">Storage Engine</th> |
| <th class="head">Fault-tolerant?</th> |
| <th class="head">Description</th> |
| </tr> |
| </thead> |
| <tbody valign="top"> |
| <tr class="row-even"><td>Persistent |
| <code class="docutils literal"><span class="pre">KeyValueStore<K,</span> <span class="pre">V></span></code></td> |
| <td>RocksDB</td> |
| <td>Yes (enabled by default)</td> |
| <td><ul class="first simple"> |
| <li><strong>The recommended store type for most use cases.</strong></li> |
| <li>Stores its data on local disk.</li> |
| <li>Storage capacity: |
| managed local state can be larger than the memory (heap space) of an |
| application instance, but must fit into the available local disk |
| space.</li> |
| <li>RocksDB settings can be fine-tuned, see |
| <a class="reference internal" href="config-streams.html#streams-developer-guide-rocksdb-config"><span class="std std-ref">RocksDB configuration</span></a>.</li> |
| <li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore(java.lang.String)">store variants</a>: |
| timestamped key-value store, versioned key-value store, time window key-value store, session window key-value store.</li> |
| <li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore(java.lang.String)">persistentTimestampedKeyValueStore</a> |
| when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries.</li> |
| <li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration)">persistentVersionedKeyValueStore</a> |
| when you need a persistent, versioned key-(value/timestamp) store that supports put/get/delete and timestamped get operations.</li> |
| <li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore(java.lang.String,java.time.Duration,java.time.Duration,boolean)">persistentWindowStore</a> |
| or <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore(java.lang.String,java.time.Duration,java.time.Duration,boolean)">persistentTimestampedWindowStore</a> |
| when you need a persistent timeWindowedKey-value or timeWindowedKey-(value/timestamp) store, respectively.</li> |
| <li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore(java.lang.String,java.time.Duration)">persistentSessionStore</a> |
| when you need a persistent sessionWindowedKey-value store.</li> |
| </ul> |
| <pre class="line-numbers"><code class="language-java">// Creating a persistent key-value store: |
| // here, we create a `KeyValueStore<String, Long>` named "persistent-counts". |
| import org.apache.kafka.streams.state.StoreBuilder; |
| import org.apache.kafka.streams.state.Stores; |
| |
| // Using a `KeyValueStoreBuilder` to build a `KeyValueStore`. |
| StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = |
| Stores.keyValueStoreBuilder( |
| Stores.persistentKeyValueStore("persistent-counts"), |
| Serdes.String(), |
| Serdes.Long()); |
| KeyValueStore<String, Long> countStore = countStoreSupplier.build();</code></pre> |
| </td> |
| </tr> |
| <tr class="row-odd"><td>In-memory |
| <code class="docutils literal"><span class="pre">KeyValueStore<K,</span> <span class="pre">V></span></code></td> |
| <td>-</td> |
| <td>Yes (enabled by default)</td> |
| <td><ul class="first simple"> |
| <li>Stores its data in memory.</li> |
| <li>Storage capacity: |
| managed local state must fit into memory (heap space) of an |
| application instance.</li> |
| <li>Useful when application instances run in an environment where local |
| disk space is either not available or local disk space is wiped |
| in-between app instance restarts.</li> |
| <li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-">store variants</a>: |
| time window key-value store, session window key-value store.</li> |
| <li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html">TimestampedKeyValueStore</a> |
| when you need a key-(value/timestamp) store that supports put/get/delete and range queries.</li> |
| <li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html">TimestampedWindowStore</a> |
| when you need to store windowedKey-(value/timestamp) pairs.</li> |
| <li>There is no built-in in-memory, versioned key-value store at this time.</li> |
| </ul> |
| <pre class="line-numbers"><code class="language-java">// Creating an in-memory key-value store: |
| // here, we create a `KeyValueStore<String, Long>` named "inmemory-counts". |
| import org.apache.kafka.streams.state.StoreBuilder; |
| import org.apache.kafka.streams.state.Stores; |
| |
| // Using a `KeyValueStoreBuilder` to build a `KeyValueStore`. |
| StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = |
| Stores.keyValueStoreBuilder( |
| Stores.inMemoryKeyValueStore("inmemory-counts"), |
| Serdes.String(), |
| Serdes.Long()); |
| KeyValueStore<String, Long> countStore = countStoreSupplier.build();</code></pre> |
| </td> |
| </tr> |
| </tbody> |
| </table> |
| </div> |
| <div class="section" id="fault-tolerant-state-stores"> |
| <span id="streams-developer-guide-state-store-fault-tolerance"></span><h3><a class="toc-backref" href="#id5">Fault-tolerant State Stores</a><a class="headerlink" href="#fault-tolerant-state-stores" title="Permalink to this headline"></a></h3> |
| <p>To make state stores fault-tolerant and to allow for state store migration without data loss, a state store can be |
| continuously backed up to a Kafka topic behind the scenes. For example, to migrate a stateful stream task from one |
| machine to another when <a class="reference internal" href="running-app.html#streams-developer-guide-execution-scaling"><span class="std std-ref">elastically adding or removing capacity from your application</span></a>. |
| This topic is sometimes referred to as the state store’s associated <em>changelog topic</em>, or its <em>changelog</em>. For example, if |
| you experience machine failure, the state store and the application’s state can be fully restored from its changelog. You can |
| <a class="reference internal" href="#streams-developer-guide-state-store-enable-disable-fault-tolerance"><span class="std std-ref">enable or disable this backup feature</span></a> for a |
| state store.</p> |
| <p>Fault-tolerant state stores are backed by a |
| <a class="reference external" href="https://kafka.apache.org/documentation.html#compaction">compacted</a> changelog topic. The purpose of compacting this |
| topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster, |
| and to minimize recovery time if a state store needs to be restored from its changelog topic.</p> |
| <p>Fault-tolerant windowed state stores are backed by a topic that uses both compaction and |
| deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of |
| deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are |
| composite keys that include the “normal” key and window timestamps. For these types of composite keys it would not |
| be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion |
| enabled, old windows that have expired will be cleaned up by Kafka’s log cleaner as the log segments expire. The |
| default retention setting is <code class="docutils literal"><span class="pre">Windows#maintainMs()</span></code> + 1 day. You can override this setting by specifying |
| <code class="docutils literal"><span class="pre">StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG</span></code> in the <code class="docutils literal"><span class="pre">StreamsConfig</span></code>.</p> |
| <p>When you open an <code class="docutils literal"><span class="pre">Iterator</span></code> from a state store you must call <code class="docutils literal"><span class="pre">close()</span></code> on the iterator when you are done working with |
| it to reclaim resources; or you can use the iterator from within a try-with-resources statement. If you do not close an iterator, |
| you may encounter an OOM error.</p> |
| </div> |
| <div class="section" id="enable-or-disable-fault-tolerance-of-state-stores-store-changelogs"> |
| <span id="streams-developer-guide-state-store-enable-disable-fault-tolerance"></span><h3><a class="toc-backref" href="#id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a><a class="headerlink" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" title="Permalink to this headline"></a></h3> |
| <p>You can enable or disable fault tolerance for a state store by enabling or disabling the change logging |
| of the store through <code class="docutils literal"><span class="pre">enableLogging()</span></code> and <code class="docutils literal"><span class="pre">disableLogging()</span></code>. |
| You can also fine-tune the associated topic’s configuration if needed.</p> |
| <p>Example for disabling fault-tolerance:</p> |
| <pre class="line-numbers"><code class="language-java">import org.apache.kafka.streams.state.StoreBuilder; |
| import org.apache.kafka.streams.state.Stores; |
| |
| StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder( |
| Stores.persistentKeyValueStore("Counts"), |
| Serdes.String(), |
| Serdes.Long()) |
| .withLoggingDisabled(); // disable backing up the store to a changelog topic</code></pre> |
| <div class="admonition attention"> |
| <p class="first admonition-title">Attention</p> |
| <p class="last">If the changelog is disabled then the attached state store is no longer fault tolerant and it can’t have any <a class="reference internal" href="config-streams.html#streams-developer-guide-standby-replicas"><span class="std std-ref">standby replicas</span></a>.</p> |
| </div> |
| <p>Here is an example for enabling fault tolerance, with additional changelog-topic configuration: |
| You can add any log config from <a class="reference external" href="https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogConfig.scala">kafka.log.LogConfig</a>. |
| Unrecognized configs will be ignored.</p> |
| <pre class="line-numbers"><code class="language-java">import org.apache.kafka.streams.state.StoreBuilder; |
| import org.apache.kafka.streams.state.Stores; |
| |
| Map<String, String> changelogConfig = new HashMap(); |
| // override min.insync.replicas |
| changelogConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1") |
| |
| StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder( |
| Stores.persistentKeyValueStore("Counts"), |
| Serdes.String(), |
| Serdes.Long()) |
| .withLoggingEnabled(changelogConfig); // enable changelogging, with custom changelog settings</code></pre> |
| </div> |
| <div class="section" id="timestamped-state-stores"> |
| <span id="streams-developer-guide-state-store-timestamps"></span><h3><a class="toc-backref" href="#id11">Timestamped State Stores</a><a class="headerlink" href="#timestamped-state-stores" title="Permalink to this headline"></a></h3> |
| <p>KTables always store timestamps by default. |
| A timestamped state store improves stream processing semantics and enables |
| handling out-of-order data in source KTables, detecting out-of-order joins and aggregations, |
| and getting the timestamp of the latest update in an Interactive Query.</p> |
| <p>You can query timestamped state stores both with and without a timestamp.</p> |
| <b>Upgrade note:</b> All users upgrade with a single rolling bounce per instance. |
| <ul class="first simple"> |
| <li>For Processor API users, nothing changes in existing applications, and you |
| have the option of using the timestamped stores.</li> |
| <li>For DSL operators, store data is upgraded lazily in the background.</li> |
| <li>No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in |
| by implementing the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html">TimestampedBytesStore</a> |
| interface. In this case, the old format is retained, and Streams uses a proxy store |
| that removes/adds timestamps on read/write.</li> |
| </ul> |
| </p> |
| </div> |
| <div class="section" id="versioned-state-stores"> |
| <span id="streams-developer-guide-state-store-versioned"></span><h3><a class="toc-backref" href="#id12">Versioned Key-Value State Stores</a><a class="headerlink" href="#versioned-state-stores" title="Permalink to this headline"></a></h3> |
| <p>Versioned key-value state stores are available since Kafka Streams 3.5. |
| Rather than storing a single record version (value and timestamp) per key, |
| versioned state stores may store multiple record versions per key. This |
| allows versioned state stores to support timestamped retrieval operations |
| to return the latest record (per key) as of a specified timestamp.</p> |
| <p>You can create a persistent, versioned state store by passing a |
| <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration)">VersionedBytesStoreSupplier</a> |
| to the |
| <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#versionedKeyValueStoreBuilder(java.lang.String,java.time.Duration)">versionedKeyValueStoreBuilder</a>, |
| or by implementing your own |
| <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/VersionedKeyValueStore.html">VersionedKeyValueStore</a>.</p> |
| <p>Each versioned store has an associated, fixed-duration <em>history retention</em> |
| parameter which specifies long old record versions should be kept for. |
| In particular, a versioned store guarantees to return accurate results for |
| timestamped retrieval operations where the timestamp being queried is within |
| history retention of the current observed stream time.</p> |
| <p>History retention also doubles as its <em>grace period</em>, which determines |
| how far back in time out-of-order writes to the store will be accepted. A |
| versioned store will not accept writes (inserts, updates, or deletions) if |
| the timestamp associated with the write is older than the current observed |
| stream time by more than the grace period. Stream time in this context is |
| tracked per-partition, rather than per-key, which means it's important |
| that grace period (i.e., history retention) be set high enough to |
| accommodate a record with one key arriving out-of-order relative to a |
| record for another key.</p> |
| <p>Because the memory footprint of versioned key-value stores is higher than |
| that of non-versioned key-value stores, you may want to adjust your |
| <a class="reference internal" href="memory-mgmt.html#streams-developer-guide-memory-management-rocksdb"><span class="std std-ref">RocksDB memory settings</span></a> |
| accordingly. Benchmarking your application with versioned stores is also |
| advised as performance is expected to be worse than when using non-versioned |
| stores.</p> |
| <p>Versioned stores do not support caching or interactive queries at this time. |
| Also, window stores and global tables may not be versioned.</p> |
| <b>Upgrade note:</b> Versioned state stores are opt-in only; no automatic |
| upgrades from non-versioned to versioned stores will take place. |
| <p>Upgrades are supported from persistent, non-versioned key-value stores |
| to persistent, versioned key-value stores as long as the original store |
| has the same changelog topic format as the versioned store being upgraded |
| to. Both persistent |
| <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore(java.lang.String)">key-value stores</a> |
| and <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore(java.lang.String)">timestamped key-value stores</a> |
| share the same changelog topic format as |
| <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration)">persistent versioned key-value stores</a>, |
| and therefore both are eligible for upgrades.</p> |
| <p>If you wish to upgrade an application using persistent, non-versioned |
| key-value stores to use persistent, versioned key-value stores |
| instead, you can perform the following procedure:</p> |
| <ul class="first simple"> |
| <li>Stop all application instances, and |
| <a class="reference internal" href="app-reset-tool.html#streams-developer-guide-reset-local-environment"><span class="std std-ref">clear any local state directories</span></a> |
| for the store(s) being upgraded.</li> |
| <li>Update your application code to use versioned stores where desired.</li> |
| <li>Update your changelog topic configs, for the relevant state stores, |
| to set the value of <code class="docutils literal"><span class="pre">min.compaction.lag.ms</span></code> |
| to be at least your desired history retention. History retention plus |
| one day is recommended as buffer for the use of broker wall clock time |
| during compaction.</li> |
| <li>Restart your application instances and allow time for the versioned |
| stores to rebuild state from changelog.</li> |
| </ul> |
| </p> |
| </div> |
| <div class="section" id="implementing-custom-state-stores"> |
| <span id="streams-developer-guide-state-store-custom"></span><h3><a class="toc-backref" href="#id7">Implementing Custom State Stores</a><a class="headerlink" href="#implementing-custom-state-stores" title="Permalink to this headline"></a></h3> |
| <p>You can use the <a class="reference internal" href="#streams-developer-guide-state-store-defining"><span class="std std-ref">built-in state store types</span></a> or implement your own. |
| The primary interface to implement for the store is |
| <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStore</span></code>. Kafka Streams also has a few extended interfaces such |
| as <code class="docutils literal"><span class="pre">KeyValueStore</span></code> and <code class="docutils literal"><span class="pre">VersionedKeyValueStore</span></code>.</p> |
| <p>Note that your customized <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStore</span></code> implementation also needs to provide the logic on how to restore the state |
| via the <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateRestoreCallback</span></code> or <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.BatchingStateRestoreCallback</span></code> interface. |
| Details on how to instantiate these interfaces can be found in the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/StateStore.html">javadocs</a>.</p> |
| <p>You also need to provide a “builder” for the store by implementing the |
| <code class="docutils literal"><span class="pre">org.apache.kafka.streams.state.StoreBuilder</span></code> interface, which Kafka Streams uses to create instances of |
| your store.</p> |
| </div> |
| </div> |
| <div class="section" id="accessing-processor-context"> |
| <h2><a class="toc-backref" href="#id10">Accessing Processor Context</a><a class="headerlink" href="#accessing-processor-context" title="Permalink to this headline"></a></h2> |
| <p>As we have mentioned in the <a href=#defining-a-stream-processor>Defining a Stream Processor</a> section, a <code>ProcessorContext</code> control the processing workflow, such as scheduling a punctuation function, and committing the current processed state.</p> |
| <p>This object can also be used to access the metadata related with the application like |
| <code class="docutils literal"><span class="pre">applicationId</span></code>, <code class="docutils literal"><span class="pre">taskId</span></code>, |
| and <code class="docutils literal"><span class="pre">stateDir</span></code>, and also record related metadata as <code class="docutils literal"><span class="pre">topic</span></code>, |
| <code class="docutils literal"><span class="pre">partition</span></code>, <code class="docutils literal"><span class="pre">offset</span></code>, <code class="docutils literal"><span class="pre">timestamp</span></code> and |
| <code class="docutils literal"><span class="pre">headers</span></code>.</p> |
| <p>Here is an example implementation of how to add a new header to the record:</p> |
| <pre class="line-numbers"><code class="language-java">public void process(String key, String value) { |
| |
| // add a header to the elements |
| context().headers().add.("key", "value"); |
| }</code></pre> |
| <div class="section" id="connecting-processors-and-state-stores"> |
| <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a><a class="headerlink" href="#connecting-processors-and-state-stores" title="Permalink to this headline"></a></h2> |
| <p>Now that a <a class="reference internal" href="#streams-developer-guide-stream-processor"><span class="std std-ref">processor</span></a> (WordCountProcessor) and the |
| state stores have been defined, you can construct the processor topology by connecting these processors and state stores together by |
| using the <code class="docutils literal"><span class="pre">Topology</span></code> instance. In addition, you can add source processors with the specified Kafka topics |
| to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate |
| output data streams out of the topology.</p> |
| <p>Here is an example implementation:</p> |
| <pre class="line-numbers"><code class="language-java">Topology builder = new Topology(); |
| // add the source processor node that takes Kafka topic "source-topic" as input |
| builder.addSource("Source", "source-topic") |
| // add the WordCountProcessor node which takes the source processor as its upstream processor |
| .addProcessor("Process", () -> new WordCountProcessor(), "Source") |
| // add the count store associated with the WordCountProcessor processor |
| .addStateStore(countStoreBuilder, "Process") |
| // add the sink processor node that takes Kafka topic "sink-topic" as output |
| // and the WordCountProcessor node as its upstream processor |
| .addSink("Sink", "sink-topic", "Process");</code></pre> |
| <p>Here is a quick explanation of this example:</p> |
| <ul class="simple"> |
| <li>A source processor node named <code class="docutils literal"><span class="pre">"Source"</span></code> is added to the topology using the <code class="docutils literal"><span class="pre">addSource</span></code> method, with one Kafka topic |
| <code class="docutils literal"><span class="pre">"source-topic"</span></code> fed to it.</li> |
| <li>A processor node named <code class="docutils literal"><span class="pre">"Process"</span></code> with the pre-defined <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> logic is then added as the downstream |
| processor of the <code class="docutils literal"><span class="pre">"Source"</span></code> node using the <code class="docutils literal"><span class="pre">addProcessor</span></code> method.</li> |
| <li>A predefined persistent key-value state store is created and associated with the <code class="docutils literal"><span class="pre">"Process"</span></code> node, using |
| <code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li> |
| <li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">"Process"</span></code> node |
| as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">"sink-topic"</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code> |
| to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li> |
| </ul> |
| <p>In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor to the topology. |
| This can be done by implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> |
| instead of calling <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code>, like this: |
| </p> |
| <pre class="line-numbers"><code class="language-java">Topology builder = new Topology(); |
| // add the source processor node that takes Kafka "source-topic" as input |
| builder.addSource("Source", "source-topic") |
| // add the WordCountProcessor node which takes the source processor as its upstream processor. |
| // the ProcessorSupplier provides the count store associated with the WordCountProcessor |
| .addProcessor("Process", new ProcessorSupplier<String, String, String, String>() { |
| public Processor<String, String, String, String> get() { |
| return new WordCountProcessor(); |
| } |
| |
| public Set<StoreBuilder<?>> stores() { |
| final StoreBuilder<KeyValueStore<String, Long>> countsStoreBuilder = |
| Stores |
| .keyValueStoreBuilder( |
| Stores.persistentKeyValueStore("Counts"), |
| Serdes.String(), |
| Serdes.Long() |
| ); |
| return Collections.singleton(countsStoreBuilder); |
| } |
| }, "Source") |
| // add the sink processor node that takes Kafka topic "sink-topic" as output |
| // and the WordCountProcessor node as its upstream processor |
| .addSink("Sink", "sink-topic", "Process");</code></pre> |
| <p>This allows for a processor to "own" state stores, effectively encapsulating their usage from the user wiring the topology. |
| Multiple processors that share a state store may provide the same store with this technique, as long as the <code class="docutils literal"><span class="pre">StoreBuilder</span></code> is the same <code class="docutils literal"><span class="pre">instance</span></code>.</p> |
| <p>In these topologies, the <code class="docutils literal"><span class="pre">"Process"</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">"Source"</span></code> node, and an |
| upstream processor of the <code class="docutils literal"><span class="pre">"Sink"</span></code> node. As a result, whenever the <code class="docutils literal"><span class="pre">"Source"</span></code> node forwards a newly fetched record from |
| Kafka to its downstream <code class="docutils literal"><span class="pre">"Process"</span></code> node, the <code class="docutils literal"><span class="pre">WordCountProcessor#process()</span></code> method is triggered to process the record and |
| update the associated state store. Whenever <code class="docutils literal"><span class="pre">context#forward()</span></code> is called in the |
| <code class="docutils literal"><span class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate key-value pair will be sent via the <code class="docutils literal"><span class="pre">"Sink"</span></code> processor node to |
| the Kafka topic <code class="docutils literal"><span class="pre">"sink-topic"</span></code>. Note that in the <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> implementation, you must refer to the |
| same store name <code class="docutils literal"><span class="pre">"Counts"</span></code> when accessing the key-value store, otherwise an exception will be thrown at runtime, |
| indicating that the state store cannot be found. If the state store is not associated with the processor |
| in the <code class="docutils literal"><span class="pre">Topology</span></code> code, accessing it in the processor’s <code class="docutils literal"><span class="pre">init()</span></code> method will also throw an exception at |
| runtime, indicating the state store is not accessible from this processor.</p> |
| <p>Note that the <code class="docutils literal"><span class="pre">Topology#addProcessor</span></code> function takes a <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> as argument, and that the supplier pattern requires that a new |
| <code class="docutils literal"><span class="pre">Processor</span></code> instance is returned each time <code class="docutils literal"><span class="pre">ProcessorSupplier#get()</span></code> is called. Creating a single <code class="docutils literal"><span class="pre">Processor</span></code> |
| object and returning the same object reference in <code class="docutils literal"><span class="pre">ProcessorSupplier#get()</span></code> would be a violation of the supplier pattern and leads to runtime exceptions. |
| So remember not to provide a singleton <code class="docutils literal"><span class="pre">Processor</span></code> instance to <code class="docutils literal"><span class="pre">Topology</span></code>. The |
| <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> should always generate a new instance each time <code class="docutils literal"><span class="pre">ProcessorSupplier#get()</span></code> gets called.</p> |
| <p>Now that you have fully defined your processor topology in your application, you can proceed to |
| <a class="reference internal" href="running-app.html#streams-developer-guide-execution"><span class="std std-ref">running the Kafka Streams application</span></a>.</p> |
| </div> |
| </div> |
| |
| |
| </div> |
| </div> |
| <div class="pagination"> |
| <a href="/{{version}}/documentation/streams/developer-guide/dsl-api" class="pagination__btn pagination__btn__prev">Previous</a> |
| <a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__next">Next</a> |
| </div> |
| </script> |
| |
| <!--#include virtual="../../../includes/_header.htm" --> |
| <!--#include virtual="../../../includes/_top.htm" --> |
| <div class="content documentation "> |
| <!--#include virtual="../../../includes/_nav.htm" --> |
| <div class="right"> |
| <!--//#include virtual="../../../includes/_docs_banner.htm" --> |
| <ul class="breadcrumbs"> |
| <li><a href="/documentation">Documentation</a></li> |
| <li><a href="/documentation/streams">Kafka Streams</a></li> |
| <li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li> |
| </ul> |
| <div class="p-content"></div> |
| </div> |
| </div> |
| <!--#include virtual="../../../includes/_footer.htm" --> |
| <script> |
| $(function() { |
| // Show selected style on nav item |
| $('.b-nav__streams').addClass('selected'); |
| |
| //sticky secondary nav |
| var $navbar = $(".sub-nav-sticky"), |
| y_pos = $navbar.offset().top, |
| height = $navbar.height(); |
| |
| $(window).scroll(function() { |
| var scrollTop = $(window).scrollTop(); |
| |
| if (scrollTop > y_pos - height) { |
| $navbar.addClass("navbar-fixed") |
| } else if (scrollTop <= y_pos) { |
| $navbar.removeClass("navbar-fixed") |
| } |
| }); |
| |
| // Display docs subnav items |
| $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded'); |
| }); |
| </script> |