blob: 6a9128a655f34afd3912160cc369aaf8e548d34a [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script><!--#include virtual="../../js/templateData.js" --></script>
<script id="content-template" type="text/x-handlebars-template">
<!-- h1>Developer Guide for Kafka Streams</h1 -->
<div class="sub-nav-sticky">
<div class="sticky-top">
<!-- div style="height:35px">
<a href="/{{version}}/documentation/streams/">Introduction</a>
<a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
</div -->
</div>
</div>
<div class="section" id="running-streams-applications">
<span id="streams-developer-guide-execution"></span><h1>Running Streams Applications<a class="headerlink" href="#running-streams-applications" title="Permalink to this headline"></a></h1>
<p>You can run Java applications that use the Kafka Streams library without any additional configuration or requirements. Kafka Streams
also provides the ability to receive notification of the various states of the application. The ability to monitor the runtime
status is discussed in <a class="reference internal" href="/documentation/#kafka_streams_monitoring"><span class="std std-ref">the monitoring guide</span></a>.</p>
<div class="contents local topic" id="table-of-contents">
<p class="topic-title first"><b>Table of Contents</b></p>
<ul class="simple">
<li><a class="reference internal" href="#starting-a-kafka-streams-application" id="id3">Starting a Kafka Streams application</a></li>
<li><a class="reference internal" href="#elastic-scaling-of-your-application" id="id4">Elastic scaling of your application</a><ul>
<li><a class="reference internal" href="#adding-capacity-to-your-application" id="id5">Adding capacity to your application</a></li>
<li><a class="reference internal" href="#removing-capacity-from-your-application" id="id6">Removing capacity from your application</a></li>
<li><a class="reference internal" href="#state-restoration-during-workload-rebalance" id="id7">State restoration during workload rebalance</a></li>
<li><a class="reference internal" href="#determining-how-many-application-instances-to-run" id="id8">Determining how many application instances to run</a></li>
</ul>
</li>
</ul>
</div>
<div class="section" id="starting-a-kafka-streams-application">
<span id="streams-developer-guide-execution-starting"></span><h2><a class="toc-backref" href="#id3">Starting a Kafka Streams application</a><a class="headerlink" href="#starting-a-kafka-streams-application" title="Permalink to this headline"></a></h2>
<p>You can package your Java application as a fat JAR file and then start the application like this:</p>
<div class="highlight-bash"><div class="highlight"><pre><span></span><span class="c1"># Start the application in class `com.example.MyStreamsApp`</span>
<span class="c1"># from the fat JAR named `path-to-app-fatjar.jar`.</span>
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp</code></pre></div>
</div>
<p>When you start your application you are launching a Kafka Streams instance of your application. You can run multiple
instances of your application. A common scenario is that there are multiple instances of your application running in
parallel. For more information, see <a class="reference internal" href="../architecture.html#streams_architecture_tasks"><span class="std std-ref">Parallelism Model</span></a>.</p>
<p>When the application instance starts running, the defined processor topology will be initialized as one or more stream tasks.
If the processor topology defines any state stores, these are also constructed during the initialization period. For
more information, see the <a class="reference internal" href="#streams-developer-guide-execution-scaling-state-restoration"><span class="std std-ref">State restoration during workload rebalance</span></a> section).</p>
</div>
<div class="section" id="elastic-scaling-of-your-application">
<span id="streams-developer-guide-execution-scaling"></span><h2><a class="toc-backref" href="#id4">Elastic scaling of your application</a><a class="headerlink" href="#elastic-scaling-of-your-application" title="Permalink to this headline"></a></h2>
<p>Kafka Streams makes your stream processing applications elastic and scalable. You can add and remove processing capacity
dynamically during application runtime without any downtime or data loss. This makes your applications
resilient in the face of failures and for allows you to perform maintenance as needed (e.g. rolling upgrades).</p>
<p>For more information about this elasticity, see the <a class="reference internal" href="../architecture.html#streams_architecture_tasks"><span class="std std-ref">Parallelism Model</span></a> section. Kafka Streams
leverages the Kafka group management functionality, which is built right into the <a class="reference external" href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">Kafka wire protocol</a>. It is the foundation that enables the
elasticity of Kafka Streams applications: members of a group coordinate and collaborate jointly on the consumption and
processing of data in Kafka. Additionally, Kafka Streams provides stateful processing and allows for fault-tolerant
state in environments where application instances may come and go at any time.</p>
<div class="section" id="adding-capacity-to-your-application">
<h3><a class="toc-backref" href="#id5">Adding capacity to your application</a><a class="headerlink" href="#adding-capacity-to-your-application" title="Permalink to this headline"></a></h3>
<p>If you need more processing capacity for your stream processing application, you can simply start another instance of your stream processing application, e.g. on another machine, in order to scale out. The instances of your application will become aware of each other and automatically begin to share the processing work. More specifically, what will be handed over from the existing instances to the new instances is (some of) the stream tasks that have been run by the existing instances. Moving stream tasks from one instance to another results in moving the processing work plus any internal state of these stream tasks (the state of a stream task will be re-created in the target instance by restoring the state from its corresponding changelog topic).</p>
<p>The various instances of your application each run in their own JVM process, which means that each instance can leverage all the processing capacity that is available to their respective JVM process (minus the capacity that any non-Kafka-Streams part of your application may be using). This explains why running additional instances will grant your application additional processing capacity. The exact capacity you will be adding by running a new instance depends of course on the environment in which the new instance runs: available CPU cores, available main memory and Java heap space, local storage, network bandwidth, and so on. Similarly, if you stop any of the running instances of your application, then you are removing and freeing up the respective processing capacity.</p>
<div class="figure align-center" id="id1">
<img class="centered" src="/{{version}}/images/streams-elastic-scaling-1.png">
<p class="caption"><span class="caption-text">Before adding capacity: only a single instance of your Kafka Streams application is running. At this point the corresponding Kafka consumer group of your application contains only a single member (this instance). All data is being read and processed by this single instance.</span></p>
</div>
<div class="figure align-center" id="id2">
<img class="centered" src="/{{version}}/images/streams-elastic-scaling-2.png">
<p class="caption"><span class="caption-text">After adding capacity: now two additional instances of your Kafka Streams application are running, and they have automatically joined the application&#8217;s Kafka consumer group for a total of three current members. These three instances are automatically splitting the processing work between each other. The splitting is based on the Kafka topic partitions from which data is being read.</span></p>
</div>
</div>
<div class="section" id="removing-capacity-from-your-application">
<h3><a class="toc-backref" href="#id6">Removing capacity from your application</a><a class="headerlink" href="#removing-capacity-from-your-application" title="Permalink to this headline"></a></h3>
<p>To remove processing capacity, you can stop running stream processing application instances (e.g., shut down two of
the four instances), it will automatically leave the applications consumer group, and the remaining instances of
your application will automatically take over the processing work. The remaining instances take over the stream tasks that
were run by the stopped instances. Moving stream tasks from one instance to another results in moving the processing
work plus any internal state of these stream tasks. The state of a stream task is recreated in the target instance
from its changelog topic.</p>
<div class="figure align-center">
<img class="centered" src="/{{version}}/images/streams-elastic-scaling-3.png">
</div>
</div>
<div class="section" id="state-restoration-during-workload-rebalance">
<span id="streams-developer-guide-execution-scaling-state-restoration"></span><h3><a class="toc-backref" href="#id7">State restoration during workload rebalance</a><a class="headerlink" href="#state-restoration-during-workload-rebalance" title="Permalink to this headline"></a></h3>
<p>When a task is migrated, the task processing state is fully restored before the application instance resumes
processing. This guarantees the correct processing results. In Kafka Streams, state restoration is usually done by
replaying the corresponding changelog topic to reconstruct the state store. To minimize changelog-based restoration
latency by using replicated local state stores, you can specify <code class="docutils literal"><span class="pre">num.standby.replicas</span></code>. When a stream task is
initialized or re-initialized on the application instance, its state store is restored like this:</p>
<ul class="simple">
<li>If no local state store exists, the changelog is replayed from the earliest to the current offset. This reconstructs the local state store to the most recent snapshot.</li>
<li>If a local state store exists, the changelog is replayed from the previously checkpointed offset. The changes are applied and the state is restored to the most recent snapshot. This method takes less time because it is applying a smaller portion of the changelog.</li>
</ul>
<p>For more information, see <a class="reference internal" href="config-streams.html#num-standby-replicas"><span class="std std-ref">Standby Replicas</span></a>.</p>
<p>
As of version 2.6, Streams will now do most of a task's restoration in the background through warmup replicas. These will be assigned to instances that need to restore a lot of state for a task.
A stateful active task will only be assigned to an instance once its state is within the configured
<a class="reference internal" href="config-streams.html#acceptable-recovery-lag"><span class="std std-ref"><code>acceptable.recovery.lag</code></span></a>, if one exists. This means that
most of the time, a task migration will <b>not</b> result in downtime for that task. It will remain active on the instance that's already caught up, while the instance that it's being
migrated to works on restoring the state. Streams will <a class="reference internal" href="config-streams.html#probing-rebalance-interval-ms"><span class="std std-ref">regularly probe</span></a> for warmup tasks that have finished restoring and transition them to active tasks when ready.
</p>
<p>
Note, the one exception to this task availability is if none of the instances have a caught up version of that task. In that case, we have no choice but to assign the active
task to an instance that is not caught up and will have to block further processing on restoration of the task's state from the changelog. If high availability is important
for your application, you are highly recommended to enable standbys.
</p>
</div>
<div class="section" id="determining-how-many-application-instances-to-run">
<h3><a class="toc-backref" href="#id8">Determining how many application instances to run</a><a class="headerlink" href="#determining-how-many-application-instances-to-run" title="Permalink to this headline"></a></h3>
<p>The parallelism of a Kafka Streams application is primarily determined by how many partitions the input topics have. For
example, if your application reads from a single topic that has ten partitions, then you can run up to ten instances
of your applications. You can run further instances, but these will be idle.</p>
<p>The number of topic partitions is the upper limit for the parallelism of your Kafka Streams application and for the
number of running instances of your application.</p>
<p>To achieve balanced workload processing across application instances and to prevent processing hotpots, you should
distribute data and processing workloads:</p>
<ul class="simple">
<li>Data should be equally distributed across topic partitions. For example, if two topic partitions each have 1 million messages, this is better than a single partition with 2 million messages and none in the other.</li>
<li>Processing workload should be equally distributed across topic partitions. For example, if the time to process messages varies widely, then it is better to spread the processing-intensive messages across partitions rather than storing these messages within the same partition.</li>
</ul>
</div>
</div>
</div>
</div>
</div>
<div class="pagination">
<a href="/{{version}}/documentation/streams/developer-guide/memory-mgmt" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/developer-guide/manage-topics" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
<!--#include virtual="../../../includes/_header.htm" -->
<!--#include virtual="../../../includes/_top.htm" -->
<div class="content documentation ">
<!--#include virtual="../../../includes/_nav.htm" -->
<div class="right">
<!--//#include virtual="../../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a></li>
<li><a href="/documentation/streams">Kafka Streams</a></li>
<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
</ul>
<div class="p-content"></div>
</div>
</div>
<!--#include virtual="../../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
//sticky secondary nav
var $navbar = $(".sub-nav-sticky"),
y_pos = $navbar.offset().top,
height = $navbar.height();
$(window).scroll(function() {
var scrollTop = $(window).scrollTop();
if (scrollTop > y_pos - height) {
$navbar.addClass("navbar-fixed")
} else if (scrollTop <= y_pos) {
$navbar.removeClass("navbar-fixed")
}
});
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>