blob: 78db8eb564224cd657aca5dc4ef774b7c84bcaa6 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script><!--#include virtual="../js/templateData.js" --></script>
<script id="content-template" type="text/x-handlebars-template">
<h1>Write your own Streams Applications</h1>
<p>
In this guide we will start from scratch on setting up your own project to write a stream processing application using Kafka's Streams API.
It is highly recommended to read the <a href="/{{version}}/documentation/streams/quickstart">quickstart</a> first on how to run a Streams application written in Kafka Streams if you have not done so.
</p>
<h4><a id="tutorial_maven_setup" href="#tutorial_maven_setup">Setting up a Maven Project</a></h4>
<p>
We are going to use a Kafka Streams Maven Archetype for creating a Streams project structure with the following commands:
</p>
<pre class="brush: bash;">
mvn archetype:generate \
-DarchetypeGroupId=org.apache.kafka \
-DarchetypeArtifactId=streams-quickstart-java \
-DarchetypeVersion={{fullDotVersion}} \
-DgroupId=streams.examples \
-DartifactId=streams.examples \
-Dversion=0.1 \
-Dpackage=myapps
</pre>
<p>
You can use a different value for <code>groupId</code>, <code>artifactId</code> and <code>package</code> parameters if you like.
Assuming the above parameter values are used, this command will create a project structure that looks like this:
</p>
<pre class="brush: bash;">
&gt; tree streams.examples
streams-quickstart
|-- pom.xml
|-- src
|-- main
|-- java
| |-- myapps
| |-- LineSplit.java
| |-- Pipe.java
| |-- WordCount.java
|-- resources
|-- log4j.properties
</pre>
<p>
There are already several example programs written with Streams library under <code>src/main/java</code>.
Since we are going to start writing such programs from scratch, we can now delete these examples:
</p>
<pre class="brush: bash;">
&gt; cd streams-quickstart
&gt; rm src/main/java/myapps/*.java
</pre>
<h4><a id="tutorial_code_pipe" href="#tutorial_code_pipe">Writing a first Streams application: Pipe</a></h4>
It's coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under <code>src/main/java</code>.
Let's name it <code>Pipe.java</code>:
<pre class="brush: java;">
package myapps;
public class Pipe {
public static void main(String[] args) throws Exception {
}
}
</pre>
<p>
We are going to fill in the <code>main</code> function to write this pipe program. Note that we will not list the import statements as we go since IDEs can usually add them automatically.
However if you are using a text editor you need to manually add the imports, and at the end of this section we'll show the complete code snippet with import statement for you.
</p>
<p>
The first step to write a Streams application is to create a <code>java.util.Properties</code> map to specify different Streams execution configuration values as defined in <code>StreamsConfig</code>.
A couple of important configuration values you need to set are: <code>StreamsConfig.BOOTSTRAP_SERVERS_CONFIG</code>, which specifies a list of host/port pairs to use for establishing the initial connection to the Kafka cluster,
and <code>StreamsConfig.APPLICATION_ID_CONFIG</code>, which gives the unique identifier of your Streams application to distinguish itself with other applications talking to the same Kafka cluster:
</p>
<pre class="brush: java;">
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // assuming that the Kafka broker this application is talking to runs on local machine with port 9092
</pre>
<p>
In addition, you can customize other configurations in the same map, for example, default serialization and deserialization libraries for the record key-value pairs:
</p>
<pre class="brush: java;">
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
</pre>
<p>
For a full list of configurations of Kafka Streams please refer to this <a href="/{{version}}/documentation/#streamsconfigs">table</a>.
</p>
<p>
Next we will define the computational logic of our Streams application.
In Kafka Streams this computational logic is defined as a <code>topology</code> of connected processor nodes.
We can use a topology builder to construct such a topology,
</p>
<pre class="brush: java;">
final KStreamBuilder builder = new KStreamBuilder();
</pre>
<p>
And then create a source stream from a Kafka topic named <code>streams-plaintext-input</code> using this topology builder:
</p>
<pre class="brush: java;">
KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
</pre>
<p>
Now we get a <code>KStream</code> that is continuously generating records from its source Kafka topic <code>streams-plaintext-input</code>.
The records are organized as <code>String</code> typed key-value pairs.
The simplest thing we can do with this stream is to write it into another Kafka topic, say it's named <code>streams-pipe-output</code>:
</p>
<pre class="brush: java;">
source.to("streams-pipe-output");
</pre>
<p>
Note that we can also concatenate the above two lines into a single line as:
</p>
<pre class="brush: java;">
builder.stream("streams-plaintext-input").to("streams-pipe-output");
</pre>
<p>
we can now construct the Streams client with the two components we have just constructed above: the configuration map and the topology builder object
(one can also construct a <code>StreamsConfig</code> object from the <code>props</code> map and then pass that object to the constructor,
<code>KafkaStreams</code> have overloaded constructor functions to takes either type).
</p>
<pre class="brush: java;">
final KafkaStreams streams = new KafkaStreams(builder, props);
</pre>
<p>
By calling its <code>start()</code> function we can trigger the execution of this client.
The execution won't stop until <code>close()</code> is called on this client.
We can, for example, add a shutdown hook with a countdown latch to capture a user interrupt and close the client upon terminating this program:
</p>
<pre class="brush: java;">
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
</pre>
<p>
The complete code so far looks like this:
</p>
<pre class="brush: java;">
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class Pipe {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final KStreamBuilder builder = new KStreamBuilder();
builder.stream("streams-plaintext-input").to("streams-pipe-output");
final KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
</pre>
<p>
If you already have the Kafka broker up and running at <code>localhost:9092</code>,
and the topics <code>streams-plaintext-input</code> and <code>streams-pipe-output</code> created on that broker,
you can run this code in your IDE or on the command line, using Maven:
</p>
<pre class="brush: brush;">
&gt; mvn clean package
&gt; mvn exec:java -Dexec.mainClass=myapps.Pipe
</pre>
<p>
For detailed instructions on how to run a Streams application and observe its computing results,
please read the <a href="/{{version}}/documentation/streams/quickstart">Play with a Streams Application</a> section.
We will not talk about this in the rest of this section.
</p>
<h4><a id="tutorial_code_linesplit" href="#tutorial_code_linesplit">Writing a second Streams application: Line Split</a></h4>
<p>
We have learned how to construct a Streams client with its two key components: the <code>StreamsConfig</code> and <code>TopologyBuilder</code>.
Now let's move on to add some real processing logic by augmenting the current topology.
We can first create another program by first copy the existing <code>Pipe.java</code> class:
</p>
<pre class="brush: brush;">
&gt; cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java
</pre>
<p>
And change its class name as well as the application id config to distinguish with the original program:
</p>
<pre class="brush: java;">
public class Pipe {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
// ...
}
}
</pre>
<p>
Since each of the source stream's record is a <code>String</code> typed key-value pair,
let's treat the value string as a text line and split it into words with a <code>FlatMapValues</code> operator:
</p>
<pre class="brush: java;">
KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
KStream&lt;String, String&gt; words = builder.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
@Override
public Iterable&lt;String&gt; apply(String value) {
return Arrays.asList(value.split("\\W+"));
}
});
</pre>
<p>
The operator will take the <code>source</code> stream as its input, and generate a new stream named <code>words</code>
by processing each record from its source stream in order and breaking its value string into a list of words, and producing
each word as a new record to the output <code>words</code> stream.
This is a stateless operator that does not need to keep track of any previously received records or processed results.
Note if you are using JDK 8 you can use lambda expression and simplify the above code as:
</p>
<pre class="brush: java;">
KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
KStream&lt;String, String&gt; words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));
</pre>
<p>
And finally we can write the word stream back into another Kafka topic, say <code>streams-linesplit-output</code>.
Again, these two steps can be concatenated as the following (assuming lambda expression is used):
</p>
<pre class="brush: java;">
KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.to("streams-linesplit-output");
</pre>
<p>
The complete code looks like this (assuming lambda expression is used):
</p>
<pre class="brush: java;">
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class LineSplit {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final KStreamBuilder builder = new KStreamBuilder();
KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.to("streams-linesplit-output");
final KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
// ... same as Pipe.java below
}
}
</pre>
<h4><a id="tutorial_code_wordcount" href="#tutorial_code_wordcount">Writing a third Streams application: Wordcount</a></h4>
<p>
Let's now take a step further to add some "stateful" computations to the topology by counting the occurrence of the words split from the source text stream.
Following similar steps let's create another program based on the <code>LineSplit.java</code> class:
</p>
<pre class="brush: java;">
public class WordCount {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
// ...
}
}
</pre>
<p>
In order to count the words we can first modify the <code>flatMapValues</code> operator to treat all of them as lower case (assuming lambda expression is used):
</p>
<pre class="brush: java;">
source.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
@Override
public Iterable&lt;String&gt; apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
}
});
</pre>
<p>
In order to do the counting aggregation we have to first specify that we want to key the stream on the value string, i.e. the lower cased word, with a <code>groupBy</code> operator.
This operator generate a new grouped stream, which can then be aggregated by a <code>count</code> operator, which generates a running count on each of the grouped keys:
</p>
<pre class="brush: java;">
KTable&lt;String, Long&gt; counts =
source.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
@Override
public Iterable&lt;String&gt; apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
}
})
.groupBy(new KeyValueMapper&lt;String, String, String&gt;() {
@Override
public String apply(String key, String value) {
return value;
}
})
.count("Counts");
</pre>
<p>
Note that the <code>count</code> operator has a <code>String</code> typed parameter <code>Counts</code>,
which stores the running counts that keep being updated as more records are piped and processed from the source Kafka topic.
This <code>Counts</code> store can be queried in real-time, with details described in the <a href="/{{version}}/documentation/streams/developer-guide#streams_interactive_queries">Developer Manual</a>.
</p>
<p>
We can also write the <code>counts</code> KTable's changelog stream back into another Kafka topic, say <code>streams-wordcount-output</code>.
Note that this time the value type is no longer <code>String</code> but <code>Long</code>, so the default serialization classes are not viable for writing it to Kafka anymore.
We need to provide overridden serialization methods for <code>Long</code> types, otherwise a runtime exception will be thrown:
</p>
<pre class="brush: java;">
counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
</pre>
<p>
Note that in order to read the changelog stream from topic <code>streams-wordcount-output</code>,
one needs to set the value deserialization as <code>org.apache.kafka.common.serialization.LongDeserializer</code>.
Details of this can be found in the <a href="/{{version}}/documentation/streams/quickstart">Play with a Streams Application</a> section.
Assuming lambda expression from JDK 8 can be used, the above code can be simplified as:
</p>
<pre class="brush: java;">
KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count("Counts")
.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
</pre>
<p>
The complete code looks like this (assuming lambda expression is used):
</p>
<pre class="brush: java;">
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class WordCount {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final KStreamBuilder builder = new KStreamBuilder();
KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count("Counts")
.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
final KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
// ... same as Pipe.java below
}
}
</pre>
<div class="pagination">
<a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/developer-guide" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
<div class="p-quickstart-streams"></div>
<!--#include virtual="../../includes/_header.htm" -->
<!--#include virtual="../../includes/_top.htm" -->
<div class="content documentation documentation--current">
<!--#include virtual="../../includes/_nav.htm" -->
<div class="right">
<!--#include virtual="../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a></li>
<li><a href="/documentation/streams">Streams</a></li>
</ul>
<div class="p-content"></div>
</div>
</div>
<!--#include virtual="../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>