blob: ddadf10e9bc52440952a780c10485a7ac205a72e [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script><!--#include virtual="../js/templateData.js" --></script>
<script id="content-template" type="text/x-handlebars-template">
<h1>Developer Guide</h1>
<p>
There is a <a href="/{{version}}/documentation/#quickstart_kafkastreams">quickstart</a> example that provides how to run a stream processing program coded in the Kafka Streams library.
This section focuses on how to write, configure, and execute a Kafka Streams application.
</p>
<p>
As we have mentioned above, the computational logic of a Kafka Streams application is defined as a <a href="#streams_topology">processor topology</a>.
Currently Kafka Streams provides two sets of APIs to define the processor topology, which will be described in the subsequent sections.
</p>
<h3><a id="streams_processor" href="#streams_processor">Low-Level Processor API</a></h3>
<h4><a id="streams_processor_process" href="#streams_processor_process">Processor</a></h4>
<p>
As mentioned in the <a href="#streams_concepts">Core Concepts</a> section, a stream processor is a node in the processor topology that represents a single processing step.
With the <code>Processor</code> API developers can define arbitrary stream processors that process one received record at a time, and connect these processors with
their associated state stores to compose the processor topology that represents their customized processing logic.
</p>
<p>
The <code>Processor</code> interface provides two main API methods:
<code>process</code> and <code>punctuate</code>. The <code>process</code> method is performed on each
of the received record; and the <code>punctuate</code> method is performed periodically based on elapsed time.
In addition, the processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the
<code>init</code> method, and use the context to schedule the punctuation period (<code>context().schedule</code>), to
forward the modified / new key-value pair to downstream processors (<code>context().forward</code>), to commit the current
processing progress (<code>context().commit</code>), etc.
</p>
<p>
The following example <code>Processor</code> implementation defines a simple word-count algorithm:
</p>
<pre>
public class MyProcessor implements Processor&lt;String, String&gt; {
private ProcessorContext context;
private KeyValueStore&lt;String, Long&gt; kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
// call this processor's punctuate() method every 1000 milliseconds.
this.context.schedule(1000);
// retrieve the key-value store named "Counts"
this.kvStore = (KeyValueStore&lt;String, Long&gt;) context.getStateStore("Counts");
}
@Override
public void process(String dummy, String line) {
String[] words = line.toLowerCase().split(" ");
for (String word : words) {
Long oldValue = this.kvStore.get(word);
if (oldValue == null) {
this.kvStore.put(word, 1L);
} else {
this.kvStore.put(word, oldValue + 1L);
}
}
}
@Override
public void punctuate(long timestamp) {
KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue&lt;String, Long&gt; entry = iter.next();
context.forward(entry.key, entry.value.toString());
}
iter.close();
// commit the current processing progress
context.commit();
}
@Override
public void close() {
// close any resources managed by this processor.
// Note: Do not close any StateStores as these are managed
// by the library
}
};
</pre>
<p>
In the above implementation, the following actions are performed:
</p>
<ul>
<li>In the <code>init</code> method, schedule the punctuation every 1 second and retrieve the local state store by its name "Counts".</li>
<li>In the <code>process</code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this feature later in the section).</li>
<li>In the <code>punctuate</code> method, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.</li>
</ul>
<h4><a id="streams_processor_topology" href="#streams_processor_topology">Processor Topology</a></h4>
<p>
With the customized processors defined in the Processor API, developers can use the <code>TopologyBuilder</code> to build a processor topology
by connecting these processors together:
</p>
<pre>
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "src-topic")
// add "PROCESS1" node which takes the source processor "SOURCE" as its upstream processor
.addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
// add "PROCESS2" node which takes "PROCESS1" as its upstream processor
.addProcessor("PROCESS2", () -> new MyProcessor2(), "PROCESS1")
// add "PROCESS3" node which takes "PROCESS1" as its upstream processor
.addProcessor("PROCESS3", () -> new MyProcessor3(), "PROCESS1")
// add the sink processor node "SINK1" that takes Kafka topic "sink-topic1"
// as output and the "PROCESS1" node as its upstream processor
.addSink("SINK1", "sink-topic1", "PROCESS1")
// add the sink processor node "SINK2" that takes Kafka topic "sink-topic2"
// as output and the "PROCESS2" node as its upstream processor
.addSink("SINK2", "sink-topic2", "PROCESS2")
// add the sink processor node "SINK3" that takes Kafka topic "sink-topic3"
// as output and the "PROCESS3" node as its upstream processor
.addSink("SINK3", "sink-topic3", "PROCESS3");
</pre>
There are several steps in the above code to build the topology, and here is a quick walk through:
<ul>
<li>First of all a source node named "SOURCE" is added to the topology using the <code>addSource</code> method, with one Kafka topic "src-topic" fed to it.</li>
<li>Three processor nodes are then added using the <code>addProcessor</code> method; here the first processor is a child of the "SOURCE" node, but is the parent of the other two processors.</li>
<li>Finally three sink nodes are added to complete the topology using the <code>addSink</code> method, each piping from a different parent processor node and writing to a separate topic.</li>
</ul>
<h4><a id="streams_processor_statestore" href="#streams_processor_statestore">State Stores</a></h4>
<p>
Note that the <code>Processor</code> API is not limited to only accessing the current records as they arrive in the <code>process()</code> method, but can also maintain processing states
that keep recently arrived records to use in stateful processing operations such as windowed joins or aggregation.
To take advantage of these states, users can define a state store by implementing the <code>StateStore</code> interface (the Kafka Streams library also has a few extended interfaces such as <code>KeyValueStore</code>);
in practice, though, users usually do not need to customize such a state store from scratch but can simply use the <code>Stores</code> factory to define a state store by specifying whether it should be persistent, log-backed, etc.
In the following example, a persistent key-value store named Counts with key type <code>String</code> and value type <code>Long</code> is created.
</p>
<pre>
StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
</pre>
<p>
To take advantage of these state stores, developers can use the <code>TopologyBuilder.addStateStore</code> method when building the
processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created
state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>.
</p>
<pre>
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "src-topic")
.addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
// add the created state store "COUNTS" associated with processor "PROCESS1"
.addStateStore(countStore, "PROCESS1")
.addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
.addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
// connect the state store "COUNTS" with processor "PROCESS2"
.connectProcessorAndStateStores("PROCESS2", "COUNTS");
.addSink("SINK1", "sink-topic1", "PROCESS1")
.addSink("SINK2", "sink-topic2", "PROCESS2")
.addSink("SINK3", "sink-topic3", "PROCESS3");
</pre>
In the next section we present another way to build the processor topology: the Kafka Streams DSL.
<br>
<h3><a id="streams_dsl" href="#streams_dsl">High-Level Streams DSL</a></h3>
To build a processor topology using the Streams DSL, developers can apply the <code>KStreamBuilder</code> class, which is extended from the <code>TopologyBuilder</code>.
A simple example is included with the source code for Kafka in the <code>streams/examples</code> package. The rest of this section will walk
through some code to demonstrate the key steps in creating a topology using the Streams DSL, but we recommend developers to read the full example source
codes for details.
<h4><a id="streams_duality" href="#streams_duality">Duality of Streams and Tables</a></h4>
<p>
Before we discuss concepts such as aggregations in Kafka Streams we must first introduce tables, and most importantly the relationship between tables and streams:
the so-called <a href="https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying/">stream-table duality</a>.
Essentially, this duality means that a stream can be viewed as a table, and vice versa. Kafka's log compaction feature, for example, exploits this duality.
</p>
<p>
A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:
</p>
<img class="centered" src="/{{version}}/images/streams-table-duality-01.png">
The <b>stream-table duality</b> describes the close relationship between streams and tables.
<ul>
<li><b>Stream as Table</b>: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a “real” table by replaying the changelog from beginning to end to reconstruct the table. Similarly, in a more general analogy, aggregating data records in a stream – such as computing the total number of pageviews by user from a stream of pageview events – will return a table (here with the key and the value being the user and its corresponding pageview count, respectively).</li>
<li><b>Table as Stream</b>: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream's data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a “real” stream by iterating over each key-value entry in the table.</li>
</ul>
<p>
Let's illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time and different revisions of the table can be represented as a changelog stream (second column).
</p>
<img class="centered" src="/{{version}}/images/streams-table-duality-02.png" style="width:300px">
<p>
Interestingly, because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):
</p>
<img class="centered" src="/{{version}}/images/streams-table-duality-03.png" style="width:600px">
<p>
The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault-tolerance.
The stream-table duality is such an important concept that Kafka Streams models it explicitly via the <a href="#streams_kstream_ktable">KStream, KTable, and GlobalKTable</a> interfaces, which we describe in the next sections.
</p>
<h5><a id="streams_kstream_ktable" href="#streams_kstream_ktable">KStream, KTable, and GlobalKTable</a></h5>
The DSL uses three main abstractions. A <b>KStream</b> is an abstraction of a record stream, where each data record represents a self-contained datum in the unbounded data set.
A <b>KTable</b> is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is considered to be an update of the last value for the same record key,
if any (if a corresponding key doesn't exist yet, the update will be considered a create).
Like a <b>KTable</b>, a <b>GlobalKTable</b> is an abstraction of a changelog stream, where each data record represents an update.
However, a <b>GlobalKTable</b> is different from a <b>KTable</b> in that it is fully replicated on each KafkaStreams instance.
<b>GlobalKTable</b> also provides the ability to look up current values of data records by keys.
This table-lookup functionality is available through <a href="#streams_dsl_joins">join operations</a>.
To illustrate the difference between KStreams and KTables/GlobalKTables, lets imagine the following two data records are being sent to the stream:
<pre>
("alice", 1) --> ("alice", 3)
</pre>
If these records a KStream and the stream processing application were to sum the values it would return <code>4</code>. If these records were a KTable or GlobalKTable, the return would be <code>3</code>, since the last record would be considered as an update.
<h4><a id="streams_dsl_source" href="#streams_dsl_source">Create Source Streams from Kafka</a></h4>
<p>
Either a <b>record stream</b> (defined as <code>KStream</code>) or a <b>changelog stream</b> (defined as <code>KTable</code> or <code>GlobalKTable</code>)
can be created as a source stream from one or more Kafka topics (for <code>KTable</code> and <code>GlobalKTable</code> you can only create the source stream
from a single topic).
</p>
<pre>
KStreamBuilder builder = new KStreamBuilder();
KStream&lt;String, GenericRecord&gt; source1 = builder.stream("topic1", "topic2");
KTable&lt;String, GenericRecord&gt; source2 = builder.table("topic3", "stateStoreName");
GlobalKTable&lt;String, GenericRecord&gt; source2 = builder.globalTable("topic4", "globalStoreName");
</pre>
<h4><a id="streams_dsl_windowing" href="#streams_dsl_windowing">Windowing a stream</a></h4>
A stream processor may need to divide data records into time buckets, i.e. to <b>window</b> the stream by time. This is usually needed for join and aggregation operations, etc. Kafka Streams currently defines the following types of windows:
<ul>
<li><b>Hopping time windows</b> are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window's size and its advance interval (aka "hop"). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap a data record may belong to more than one such windows.</li>
<li><b>Tumbling time windows</b> are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window's size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.</li>
<li><b>Sliding windows</b> model a fixed-size window that slides continuously over the time axis; here, two data records are said to be included in the same window if the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but on the data record timestamps. In Kafka Streams, sliding windows are used only for join operations, and can be specified through the <code>JoinWindows</code> class.</li>
<li><b>Session windows</b> are used to aggregate key-based events into sessions.
Sessions represent a period of activity separated by a defined gap of inactivity.
Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions.
If the event falls outside of the session gap, then a new session will be created.
Session windows are tracked independently across keys (e.g. windows of different keys typically have different start and end times) and their sizes vary (even windows for the same key typically have different sizes);
as such session windows can't be pre-computed and are instead derived from analyzing the timestamps of the data records.
</li>
</ul>
<p>
In the Kafka Streams DSL users can specify a <b>retention period</b> for the window. This allows Kafka Streams to retain old window buckets for a period of time in order to wait for the late arrival of records whose timestamps fall within the window interval.
If a record arrives after the retention period has passed, the record cannot be processed and is dropped.
</p>
<p>
Late-arriving records are always possible in real-time data streams. However, it depends on the effective <a href="#streams_time">time semantics</a> how late records are handled. Using processing-time, the semantics are when the data is being processed”,
which means that the notion of late records is not applicable as, by definition, no record can be late. Hence, late-arriving records only really can be considered as such (i.e. as arriving late”) for event-time or ingestion-time semantics. In both cases,
Kafka Streams is able to properly handle late-arriving records.
</p>
<h4><a id="streams_dsl_joins" href="#streams_dsl_joins">Join multiple streams</a></h4>
A <b>join</b> operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely. In Kafka Streams, you may perform the following join operations:
<ul>
<li><b>KStream-to-KStream Joins</b> are always windowed joins, since otherwise the memory and state required to compute the join would grow infinitely in size. Here, a newly received record from one of the streams is joined with the other stream's records within the specified window interval to produce one result for each matching pair based on user-provided <code>ValueJoiner</code>. A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li>
<li><b>KTable-to-KTable Joins</b> are join operations designed to be consistent with the ones in relational databases. Here, both changelog streams are materialized into local state stores first. When a new record is received from one of the streams, it is joined with the other stream's materialized state stores to produce one result for each matching pair based on user-provided ValueJoiner. A new <code>KTable</code> instance representing the result stream of the join, which is also a changelog stream of the represented table, is returned from this operator.</li>
<li><b>KStream-to-KTable Joins</b> allow you to perform table lookups against a changelog stream (<code>KTable</code>) upon receiving a new record from another record stream (<code>KStream</code>). An example use case would be to enrich a stream of user activities (<code>KStream</code>) with the latest user profile information (<code>KTable</code>). Only records received from the record stream will trigger the join and produce results via <code>ValueJoiner</code>, not vice versa (i.e., records received from the changelog stream will be used only to update the materialized state store). A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li>
<li><b>KStream-to-GlobalKTable Joins</b> allow you to perform table lookups against a fully replicated changelog stream (<code>GlobalKTable</code>) upon receiving a new record from another record stream (<code>KStream</code>).
Joins with a <code>GlobalKTable</code> don't require repartitioning of the input <code>KStream</code> as all partitions of the <code>GlobalKTable</code> are available on every KafkaStreams instance.
The <code>KeyValueMapper</code> provided with the join operation is applied to each KStream record to extract the join-key that is used to do the lookup to the GlobalKTable so non-record-key joins are possible.
An example use case would be to enrich a stream of user activities (<code>KStream</code>) with the latest user profile information (<code>GlobalKTable</code>).
Only records received from the record stream will trigger the join and produce results via <code>ValueJoiner</code>, not vice versa (i.e., records received from the changelog stream will be used only to update the materialized state store).
A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li>
</ul>
Depending on the operands the following join operations are supported: <b>inner joins</b>, <b>outer joins</b> and <b>left joins</b>.
Their <a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics">semantics</a> are similar to the corresponding operators in relational databases.
<h5><a id="streams_dsl_aggregations" href="#streams_dsl_aggregations">Aggregate a stream</a></h5>
An <b>aggregation</b> operation takes one input stream, and yields a new stream by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum. An aggregation over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the aggregation may grow indefinitely.
<p>
In the Kafka Streams DSL, an input stream of an aggregation can be a <code>KStream</code> or a <code>KTable</code>, but the output stream will always be a <code>KTable</code>.
This allows Kafka Streams to update an aggregate value upon the late arrival of further records after the value was produced and emitted.
When such late arrival happens, the aggregating <code>KStream</code> or <code>KTable</code> simply emits a new aggregate value. Because the output is a <code>KTable</code>, the new value is considered to overwrite the old value with the same key in subsequent processing steps.
</p>
<h4><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform a stream</a></h4>
<p>
Besides join and aggregation operations, there is a list of other transformation operations provided for <code>KStream</code> and <code>KTable</code> respectively.
Each of these operations may generate either one or more <code>KStream</code> and <code>KTable</code> objects and
can be translated into one or more connected processors into the underlying processor topology.
All these transformation methods can be chained together to compose a complex processor topology.
Since <code>KStream</code> and <code>KTable</code> are strongly typed, all these transformation operations are defined as
generics functions where users could specify the input and output data types.
</p>
<p>
Among these transformations, <code>filter</code>, <code>map</code>, <code>mapValues</code>, etc, are stateless
transformation operations and can be applied to both <code>KStream</code> and <code>KTable</code>,
where users can usually pass a customized function to these functions as a parameter, such as <code>Predicate</code> for <code>filter</code>,
<code>KeyValueMapper</code> for <code>map</code>, etc:
</p>
<pre>
// written in Java 8+, using lambda expressions
KStream&lt;String, GenericRecord&gt; mapped = source1.mapValue(record -> record.get("category"));
</pre>
<p>
Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise
they do not require a state store associated with the stream processor; Stateful transformations, on the other hand,
require accessing an associated state for processing and producing outputs.
For example, in <code>join</code> and <code>aggregate</code> operations, a windowing state is usually used to store all the received records
within the defined window boundary so far. The operators can then access these accumulated records in the store and compute
based on them.
</p>
<pre>
// written in Java 8+, using lambda expressions
KTable&lt;Windowed&lt;String&gt;, Long&gt; counts = source1.groupByKey().aggregate(
() -> 0L, // initial value
(aggKey, value, aggregate) -> aggregate + 1L, // aggregating value
TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds
Serdes.Long() // serde for aggregated value
);
KStream&lt;String, String&gt; joined = source1.leftJoin(source2,
(record1, record2) -> record1.get("user") + "-" + record2.get("region");
);
</pre>
<h4><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams back to Kafka</a></h4>
<p>
At the end of the processing, users can choose to (continuously) write the final resulted streams back to a Kafka topic through
<code>KStream.to</code> and <code>KTable.to</code>.
</p>
<pre>
joined.to("topic4");
</pre>
If your application needs to continue reading and processing the records after they have been materialized
to a topic via <code>to</code> above, one option is to construct a new stream that reads from the output topic;
Kafka Streams provides a convenience method called <code>through</code>:
<pre>
// equivalent to
//
// joined.to("topic4");
// materialized = builder.stream("topic4");
KStream&lt;String, String&gt; materialized = joined.through("topic4");
</pre>
<br>
<h3><a id="streams_execute" href="#streams_execute">Application Configuration and Execution</a></h3>
<p>
Besides defining the topology, developers will also need to configure their applications
in <code>StreamsConfig</code> before running it. A complete list of
Kafka Streams configs can be found <a href="/{{version}}/documentation/#streamsconfigs"><b>here</b></a>.
</p>
<p>
Specifying the configuration in Kafka Streams is similar to the Kafka Producer and Consumer clients. Typically, you create a <code>java.util.Properties</code> instance,
set the necessary parameters, and construct a <code>StreamsConfig</code> instance from the <code>Properties</code> instance.
</p>
<pre>
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
Properties settings = new Properties();
// Set a few key parameters
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
// Any further settings
settings.put(... , ...);
// Create an instance of StreamsConfig from the Properties instance
StreamsConfig config = new StreamsConfig(settings);
</pre>
<p>
Apart from Kafka Streams' own configuration parameters you can also specify parameters for the Kafka consumers and producers that are used internally,
depending on the needs of your application. Similar to the Streams settings you define any such consumer and/or producer settings via <code>StreamsConfig</code>.
Note that some consumer and producer configuration parameters do use the same parameter name. For example, <code>send.buffer.bytes</code> or <code>receive.buffer.bytes</code> which
are used to configure TCP buffers; <code>request.timeout.ms</code> and <code>retry.backoff.ms</code> which control retries for client request (and some more).
If you want to set different values for consumer and producer for such a parameter, you can prefix the parameter name with <code>consumer.</code> or <code>producer.</code>:
</p>
<pre>
Properties settings = new Properties();
// Example of a "normal" setting for Kafka Streams
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
// Customize the Kafka consumer settings
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
// Customize a common client setting for both consumer and producer
settings.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 100L);
// Customize different values for consumer and producer
settings.put("consumer." + ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
settings.put("producer." + ProducerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
// Alternatively, you can use
settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.RECEIVE_BUFFER_CONFIG), 1024 * 1024);
settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG), 64 * 1024);
</pre>
<p>
You can call Kafka Streams from anywhere in your application code.
Very commonly though you would do so within the <code>main()</code> method of your application, or some variant thereof.
</p>
<p>
First, you must create an instance of <code>KafkaStreams</code>. The first argument of the <code>KafkaStreams</code> constructor takes a topology
builder (either <code>KStreamBuilder</code> for the Kafka Streams DSL, or <code>TopologyBuilder</code> for the Processor API)
that is used to define a topology; The second argument is an instance of <code>StreamsConfig</code> mentioned above.
</p>
<pre>
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TopologyBuilder;
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
KStreamBuilder builder = ...; // when using the Kafka Streams DSL
//
// OR
//
TopologyBuilder builder = ...; // when using the Processor API
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
</pre>
<p>
At this point, internal structures have been initialized, but the processing is not started yet. You have to explicitly start the Kafka Streams thread by calling the <code>start()</code> method:
</p>
<pre>
// Start the Kafka Streams instance
streams.start();
</pre>
<p>
To catch any unexpected exceptions, you may set an <code>java.lang.Thread.UncaughtExceptionHandler</code> before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception:
</p>
<pre>
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public uncaughtException(Thread t, throwable e) {
// here you should examine the exception and perform an appropriate action!
}
);
</pre>
<p>
To stop the application instance call the <code>close()</code> method:
</p>
<pre>
// Stop the Kafka Streams instance
streams.close();
</pre>
Now it's time to execute your application that uses the Kafka Streams library, which can be run just like any other Java application there is no special magic or requirement on the side of Kafka Streams.
For example, you can package your Java application as a fat jar file and then start the application via:
<pre>
# Start the application in class `com.example.MyStreamsApp`
# from the fat jar named `path-to-app-fatjar.jar`.
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
</pre>
<p>
When the application instance starts running, the defined processor topology will be initialized as one or more stream tasks that can be executed in parallel by the stream threads within the instance.
If the processor topology defines any state stores, these state stores will also be (re-)constructed, if possible, during the initialization
period of their associated stream tasks.
It is important to understand that, when starting your application as described above, you are actually launching what Kafka Streams considers to be one instance of your application.
More than one instance of your application may be running at a time, and in fact the common scenario is that there are indeed multiple instances of your application running in parallel (e.g., on another JVM or another machine).
In such cases, Kafka Streams transparently re-assigns tasks from the existing instances to the new instance that you just started.
See <a href="#streams_architecture_tasks">Stream Partitions and Tasks</a> and <a href="#streams_architecture_threads">Threading Model</a> for details.
</p>
<div class="pagination">
<a href="/{{version}}/documentation/streams/architecture" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/upgrade-guide" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
<!--#include virtual="../../includes/_header.htm" -->
<!--#include virtual="../../includes/_top.htm" -->
<div class="content documentation documentation--current">
<!--#include virtual="../../includes/_nav.htm" -->
<div class="right">
<!--#include virtual="../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a></li>
<li><a href="/documentation/streams">Streams</a></li>
</ul>
<div class="p-content"></div>
</div>
</div>
<!--#include virtual="../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>