| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <script><!--#include virtual="../js/templateData.js" --></script> |
| |
| <script id="streams-template" type="text/x-handlebars-template"> |
| <h1>Kafka Streams API</h1> |
| |
| <h3 style="max-width: 75rem;">The easiest way to write mission-critical real-time applications and microservices with all the benefits of Kafka's server-side cluster technology.</h3> |
| |
| <div class="hero"> |
| <div class="hero__diagram"> |
| <img src="/{{version}}/images/streams-welcome.png" /> |
| </div> |
| <div class="hero__cta"> |
| <a href="/{{version}}/documentation/streams/tutorial" class="btn">Write your first app</a> |
| <a href="/{{version}}/documentation/streams/quickstart" class="btn">Play with demo app</a> |
| </div> |
| </div> |
| |
| <ul class="feature-list"> |
| <li>Write standard Java applications</li> |
| <li>Exactly-once processing semantics</li> |
| <li>No seperate processing cluster required</li> |
| <li>Develop on Mac, Linux, Windows</li> |
| <li>Elastic, highly scalable, fault-tolerant</li> |
| <li>Deploy to containers, VMs, bare metal, cloud</li> |
| <li>Equally viable for small, medium, & large use cases</li> |
| <li>Fully integrated with Kafka security</li> |
| </ul> |
| |
| <div class="cards"> |
| <a class="card" href="/{{version}}/documentation/streams/developer-guide"> |
| <img class="card__icon" src="/{{version}}/images/icons/documentation.png" /> |
| <img class="card__icon card__icon--hover" src="/{{version}}/images/icons/documentation--white.png" /> |
| <span class="card__label">Developer manual</span> |
| </a> |
| <a class="card" href="/{{version}}/documentation/streams/tutorial"> |
| <img class="card__icon" src="/{{version}}/images/icons/tutorials.png" /> |
| <img class="card__icon card__icon--hover" src="/{{version}}/images/icons/tutorials--white.png" /> |
| <span class="card__label">Tutorials</span> |
| </a> |
| <a class="card" href="/{{version}}/documentation/streams/core-concepts"> |
| <img class="card__icon" src="/{{version}}/images/icons/architecture.png" /> |
| <img class="card__icon card__icon--hover" src="/{{version}}/images/icons/architecture--white.png" /> |
| <span class="card__label">Concepts</span> |
| </a> |
| </div> |
| |
| <h3>Hello Kafka Streams</h3> |
| <p>The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale</p> |
| |
| <div class="code-example"> |
| <div class="btn-group"> |
| <a class="selected b-java-8" data-section="java-8">Java 8+</a> |
| <a class="b-java-7" data-section="java-7">Java 7</a> |
| <a class="b-scala" data-section="scala">Scala</a> |
| </div> |
| |
| <div class="code-example__snippet b-java-8 selected"> |
| <pre class="line-numbers"><code class="language-java"> import org.apache.kafka.common.serialization.Serdes; |
| import org.apache.kafka.streams.KafkaStreams; |
| import org.apache.kafka.streams.StreamsConfig; |
| import org.apache.kafka.streams.kstream.KStream; |
| import org.apache.kafka.streams.kstream.KStreamBuilder; |
| import org.apache.kafka.streams.kstream.KTable; |
| |
| import java.util.Arrays; |
| import java.util.Properties; |
| |
| public class WordCountApplication { |
| |
| public static void main(final String[] args) throws Exception { |
| Properties config = new Properties(); |
| config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); |
| config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); |
| config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| |
| KStreamBuilder builder = new KStreamBuilder(); |
| KStream<String, String> textLines = builder.stream("TextLinesTopic"); |
| KTable<String, Long> wordCounts = textLines |
| .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) |
| .groupBy((key, word) -> word) |
| .count("Counts"); |
| wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic"); |
| |
| KafkaStreams streams = new KafkaStreams(builder, config); |
| streams.start(); |
| } |
| |
| } |
| </code></pre> |
| </div> |
| |
| <div class="code-example__snippet b-java-7"> |
| <pre class="line-numbers"><code class="language-java"> import org.apache.kafka.common.serialization.Serdes; |
| import org.apache.kafka.streams.KafkaStreams; |
| import org.apache.kafka.streams.StreamsConfig; |
| import org.apache.kafka.streams.kstream.KStream; |
| import org.apache.kafka.streams.kstream.KStreamBuilder; |
| import org.apache.kafka.streams.kstream.KTable; |
| import org.apache.kafka.streams.kstream.KeyValueMapper; |
| import org.apache.kafka.streams.kstream.ValueMapper; |
| |
| import java.util.Arrays; |
| import java.util.Properties; |
| |
| public class WordCountApplication { |
| |
| public static void main(final String[] args) throws Exception { |
| Properties config = new Properties(); |
| config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); |
| config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); |
| config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| |
| KStreamBuilder builder = new KStreamBuilder(); |
| KStream<String, String> textLines = builder.stream("TextLinesTopic"); |
| KTable<String, Long> wordCounts = textLines |
| .flatMapValues(new ValueMapper<String, Iterable<String>>() { |
| @Override |
| public Iterable<String> apply(String textLine) { |
| return Arrays.asList(textLine.toLowerCase().split("\\W+")); |
| } |
| }) |
| .groupBy(new KeyValueMapper<String, String, String>() { |
| @Override |
| public String apply(String key, String word) { |
| return word; |
| } |
| }) |
| .count("Counts"); |
| wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic"); |
| |
| KafkaStreams streams = new KafkaStreams(builder, config); |
| streams.start(); |
| } |
| |
| } |
| </code></pre> |
| </div> |
| |
| <div class="code-example__snippet b-scala"> |
| <pre class="line-numbers"><code class="language-scala"> import java.lang.Long |
| import java.util.Properties |
| import java.util.concurrent.TimeUnit |
| |
| import org.apache.kafka.common.serialization._ |
| import org.apache.kafka.streams._ |
| import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable} |
| |
| import scala.collection.JavaConverters.asJavaIterableConverter |
| |
| object WordCountApplication { |
| |
| def main(args: Array[String]) { |
| val config: Properties = { |
| val p = new Properties() |
| p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application") |
| p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092") |
| p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass) |
| p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass) |
| p |
| } |
| |
| val builder: KStreamBuilder = new KStreamBuilder() |
| val textLines: KStream[String, String] = builder.stream("TextLinesTopic") |
| val wordCounts: KTable[String, Long] = textLines |
| .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava) |
| .groupBy((_, word) => word) |
| .count("Counts") |
| wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic") |
| |
| val streams: KafkaStreams = new KafkaStreams(builder, config) |
| streams.start() |
| |
| Runtime.getRuntime.addShutdownHook(new Thread(() => { |
| streams.close(10, TimeUnit.SECONDS) |
| })) |
| } |
| |
| } |
| </code></pre> |
| </div> |
| </div> |
| <h3 class="customer-title">See how Kafka Streams is being used</h3> |
| <div class="customer__cards"> |
| <div class="customer__card"> |
| <div class="customer__card__icon"> |
| <img src="/{{version}}/images/icons/rabobank.png"> |
| </div> |
| <span class="customer__card__label">Rabobank is one of the 3 largest banks in the Netherlands. Its digital nervous system, the Business Event Bus, is powered by Apache Kafka and Kafka Streams. |
| </span> |
| <a href="https://www.confluent.io/blog/real-time-financial-alerts-rabobank-apache-kafkas-streams-api/">Learn More</a> |
| </div> |
| <div class="customer__card customer-right" > |
| <div class="customer__card__icon" > |
| <img src="/{{version}}/images/icons/zalando.png"> |
| </div> |
| <span class="customer__card__label">As the leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus), which helps us in transitioning from a monolithic to a micro services architecture. Using Kafka for processing event streams enables our technical team to do near-real time business intelligence.</span> |
| <a href="https://kafka-summit.org/sessions/using-kstreams-ktables-calculate-real-time-domain-rankings/">Learn More</a> |
| </div> |
| </div> |
| |
| <div class="pagination"> |
| <a href="#" class="pagination__btn pagination__btn__prev pagination__btn--disabled">Previous</a> |
| <a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__next">Next</a> |
| </div> |
| </script> |
| |
| <!--#include virtual="../../includes/_header.htm" --> |
| <!--#include virtual="../../includes/_top.htm" --> |
| <div class="content documentation documentation--current"> |
| <!--#include virtual="../../includes/_nav.htm" --> |
| <div class="right"> |
| <!--#include virtual="../../includes/_docs_banner.htm" --> |
| <ul class="breadcrumbs"> |
| <li><a href="/documentation">Documentation</a></li> |
| </ul> |
| <div class="p-streams"></div> |
| </div> |
| </div> |
| <!--#include virtual="../../includes/_footer.htm" --> |
| |
| <script> |
| $(function() { |
| // Show selected style on nav item |
| $('.b-nav__streams').addClass('selected'); |
| |
| // Display docs subnav items |
| $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded'); |
| |
| // Show selected code example |
| $('.btn-group a').click(function(){ |
| var targetClass = '.b-' + $(this).data().section; |
| $('.code-example__snippet, .btn-group a').removeClass('selected'); |
| $(targetClass).addClass('selected'); |
| }); |
| }); |
| </script> |