Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.
{{< carousel >}} {{< carousel-item title=“1. Intro to Streams” active=“true” >}} {{% youtube “ni3XPsYC5cQ” %}} {{< /carousel-item >}} {{< carousel-item title=“2. Creating a Streams Application” >}} {{% youtube “9ZhsnXM2OVM” %}} {{< /carousel-item >}} {{< carousel-item title=“3. Transforming Data Pt. 1” >}} {{% youtube “SYmqwvE8umM” %}} {{< /carousel-item >}} {{< carousel-item title=“4. Transforming Data Pt. 2” >}} {{% youtube “Vk55Kl9x_Fw” %}} {{< /carousel-item >}} {{< /carousel >}}
{{< about/kstreams-users >}}
The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale
{{< tabpane >}} {{% tab header=“Java” %}}
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Properties; public class WordCountApplication { public static void main(final String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream("TextLinesTopic"); KTable<String, Long> wordCounts = textLines .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\W+"))) .groupBy((key, word) -> word) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")); wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } }
{{% /tab %}} {{% tab header=“Scala” %}}
import java.util.Properties import java.util.concurrent.TimeUnit import org.apache.kafka.streams.kstream.Materialized import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala._ import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} object WordCountApplication extends App { import Serdes._ val props: Properties = { val p = new Properties() p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application") p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092") p } val builder: StreamsBuilder = new StreamsBuilder val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic") val wordCounts: KTable[String, Long] = textLines .flatMapValues(textLine => textLine.toLowerCase.split("\W+")) .groupBy((_, word) => word) .count()(Materialized.as("counts-store")) wordCounts.toStream.to("WordsWithCountsTopic") val streams: KafkaStreams = new KafkaStreams(builder.build(), props) streams.start() sys.ShutdownHookThread { streams.close(10, TimeUnit.SECONDS) } }
{{% /tab %}} {{< /tabpane >}}