blob: 246f8c4702c6c8d54d0f6471ed0d47145ee890d6 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script><!--#include virtual="../../js/templateData.js" --></script>
<script id="content-template" type="text/x-handlebars-template">
<!-- h1>Developer Guide for Kafka Streams</h1 -->
<div class="sub-nav-sticky">
<div class="sticky-top">
<!-- div style="height:35px">
<a href="/{{version}}/documentation/streams/">Introduction</a>
<a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
</div -->
</div>
</div>
<div class="section" id="application-reset-tool">
<span id="streams-developer-guide-app-reset"></span><h1>Application Reset Tool<a class="headerlink" href="#application-reset-tool" title="Permalink to this headline"></a></h1>
<p>You can reset an application and force it to reprocess its data from scratch by using the application reset tool.
This can be useful for development and testing, or when fixing bugs.</p>
<p>The application reset tool handles the Kafka Streams <a class="reference internal" href="manage-topics.html#streams-developer-guide-topics-user"><span class="std std-ref">user topics</span></a> (input,
output, and intermediate topics) and <a class="reference internal" href="manage-topics.html#streams-developer-guide-topics-internal"><span class="std std-ref">internal topics</span></a> differently
when resetting the application.</p>
<p>Here&#8217;s what the application reset tool does for each topic type:</p>
<ul class="simple">
<li>Input topics: Reset offsets to specified position (by default to the beginning of the topic).</li>
<li>Intermediate topics: Skip to the end of the topic, i.e., set the application&#8217;s committed consumer offsets for all partitions to each partition&#8217;s <code class="docutils literal"><span class="pre">logSize</span></code> (for consumer group <code class="docutils literal"><span class="pre">application.id</span></code>).</li>
<li>Internal topics: Delete the internal topic (this automatically deletes any committed offsets).</li>
</ul>
<p>The application reset tool does not:</p>
<ul class="simple">
<li>Reset output topics of an application. If any output (or intermediate) topics are consumed by downstream
applications, it is your responsibility to adjust those downstream applications as appropriate when you reset the
upstream application.</li>
<li>Reset the local environment of your application instances. It is your responsibility to delete the local
state on any machine on which an application instance was run. See the instructions in section
<a class="reference internal" href="#streams-developer-guide-reset-local-environment"><span class="std std-ref">Step 2: Reset the local environments of your application instances</span></a> on how to do this.</li>
</ul>
<dl class="docutils">
<dt>Prerequisites</dt>
<dd><ul class="first last">
<li><p class="first">All instances of your application must be stopped. Otherwise, the application may enter an invalid state, crash, or produce incorrect results. You can verify whether the consumer group with ID <code class="docutils literal"><span class="pre">application.id</span></code> is still active by using <code class="docutils literal"><span class="pre">bin/kafka-consumer-groups</span></code>.
When long session timeout has been configured, active members could take longer to get expired on the broker thus blocking the reset job to complete. Use the <code class="docutils literal"><span class="pre">--force</span></code> option could remove those left-over members immediately. Make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances.</p>
</li>
<li><p class="first">Use this tool with care and double-check its parameters: If you provide wrong parameter values (e.g., typos in <code class="docutils literal"><span class="pre">application.id</span></code>) or specify parameters inconsistently (e.g., specify the wrong input topics for the application), this tool might invalidate the application&#8217;s state or even impact other applications, consumer groups, or your Kafka topics.</p>
</li>
<li><p class="first">You should manually delete and re-create any intermediate topics before running the application reset tool. This will free up disk space in Kafka brokers.</p>
</li>
<li><p class="first">You should delete and recreate intermediate topics before running the application reset tool, unless the following applies:</p>
<blockquote>
<div><ul class="simple">
<li>You have external downstream consumers for the application&#8217;s intermediate topics.</li>
<li>You are in a development environment where manually deleting and re-creating intermediate topics is unnecessary.</li>
</ul>
</div></blockquote>
</li>
</ul>
</dd>
</dl>
<div class="section" id="step-1-run-the-application-reset-tool">
<h2>Step 1: Run the application reset tool<a class="headerlink" href="#step-1-run-the-application-reset-tool" title="Permalink to this headline"></a></h2>
<p>Invoke the application reset tool from the command line</p>
<div class="highlight-bash"><div class="highlight"><pre><span></span>&lt;path-to-kafka&gt;/bin/kafka-streams-application-reset</code></pre></div>
</div>
<p>The tool accepts the following parameters:</p>
<div class="highlight-bash"><div class="highlight"><pre><span></span>Option <span class="o">(</span>* <span class="o">=</span> required<span class="o">)</span> Description
--------------------- -----------
* --application-id &lt;String: id&gt; The Kafka Streams application ID
<span class="o">(</span>application.id<span class="o">)</span>.
--bootstrap-servers &lt;String: urls&gt; Comma-separated list of broker urls with
format: HOST1:PORT1,HOST2:PORT2
<span class="o">(</span>default: localhost:9092<span class="o">)</span>
--by-duration &lt;String: urls&gt; Reset offsets to offset by duration from
current timestamp. Format: '<span>PnDTnHnMnS</span>'
--config-file &lt;String: file name&gt; Property file containing configs to be
passed to admin clients and embedded
consumer.
--dry-run Display the actions that would be
performed without executing the reset
commands.
--from-file &lt;String: urls&gt; Reset offsets to values defined in CSV
file.
--input-topics &lt;String: list&gt; Comma-separated list of user input
topics. For these topics, the tool will
reset the offset to the earliest
available offset.
--intermediate-topics &lt;String: list&gt; Comma-separated list of intermediate user
topics <span class="o">(</span>topics used in the through<span class="o">()</span>
method<span class="o">)</span>. For these topics, the tool
will skip to the end.
--shift-by &lt;Long: number-of-offsets&gt; Reset offsets shifting current offset by
'n', where 'n' can be positive or
negative
--to-datetime &lt;String&gt; Reset offsets to offset from datetime.
Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset &lt;Long&gt; Reset offsets to a specific offset.
--zookeeper Zookeeper option is deprecated by
bootstrap.servers, as the reset tool
would no longer access Zookeeper
directly.
--force Force removing members of the consumer group
(intended to remove left-over members if
long session timeout was configured).</code></pre></div>
</div>
<p>Consider the following as reset-offset scenarios for <code>input-topics</code>:</p>
<ul>
<li> by-duration</li>
<li> from-file</li>
<li> shift-by</li>
<li> to-datetime</li>
<li> to-earliest</li>
<li> to-latest</li>
<li> to-offset</li>
</ul>
<p>Only one of these scenarios can be defined. If not, <code>to-earliest</code> will be executed by default</p>
<p>All the other parameters can be combined as needed. For example, if you want to restart an application from an
empty internal state, but not reprocess previous data, simply omit the parameters <code class="docutils literal"><span class="pre">--input-topics</span></code> and
<code class="docutils literal"><span class="pre">--intermediate-topics</span></code>.</p>
</div>
<div class="section" id="step-2-reset-the-local-environments-of-your-application-instances">
<span id="streams-developer-guide-reset-local-environment"></span><h2>Step 2: Reset the local environments of your application instances<a class="headerlink" href="#step-2-reset-the-local-environments-of-your-application-instances" title="Permalink to this headline"></a></h2>
<p>For a complete application reset, you must delete the application&#8217;s local state directory on any machines where the
application instance was run. You must do this before restarting an application instance on the same machine. You can
use either of these methods:</p>
<ul class="simple">
<li>The API method <code class="docutils literal"><span class="pre">KafkaStreams#cleanUp()</span></code> in your application code.</li>
<li>Manually delete the corresponding local state directory (default location: <code class="docutils literal"><span class="pre">/tmp/kafka-streams/&lt;application.id&gt;</span></code>). For more information, see <a href="/{{version}}/javadoc/org/apache/kafka/streams/StreamsConfig.html#STATE_DIR_CONFIG">Streams</a> javadocs.</li>
</ul>
</div>
</div>
</div>
</div>
<div class="pagination">
<a href="/{{version}}/documentation/streams/developer-guide/security" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/upgrade-guide" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
<!--#include virtual="../../../includes/_header.htm" -->
<!--#include virtual="../../../includes/_top.htm" -->
<div class="content documentation ">
<!--#include virtual="../../../includes/_nav.htm" -->
<div class="right">
<!--//#include virtual="../../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a></li>
<li><a href="/documentation/streams">Kafka Streams</a></li>
<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
</ul>
<div class="p-content"></div>
</div>
</div>
<!--#include virtual="../../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
//sticky secondary nav
var $navbar = $(".sub-nav-sticky"),
y_pos = $navbar.offset().top,
height = $navbar.height();
$(window).scroll(function() {
var scrollTop = $(window).scrollTop();
if (scrollTop > y_pos - height) {
$navbar.addClass("navbar-fixed")
} else if (scrollTop <= y_pos) {
$navbar.removeClass("navbar-fixed")
}
});
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>