blob: 94551826c4d7eedca1398daf697597059cc37f21 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<title>Storm State Management</title>
<!-- Bootstrap core CSS -->
<link href="/assets/css/bootstrap.min.css" rel="stylesheet">
<!-- Bootstrap theme -->
<link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
<!-- Custom styles for this template -->
<link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
<link href="/css/style.css" rel="stylesheet">
<link href="/assets/css/owl.theme.css" rel="stylesheet">
<link href="/assets/css/owl.carousel.css" rel="stylesheet">
<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
<script type="text/javascript" src="/assets/js/storm.js"></script>
<!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
<!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<header>
<div class="container-fluid">
<div class="row">
<div class="col-md-5">
<a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
</div>
<div class="col-md-5">
<h1>Version: 1.1.2</h1>
</div>
<div class="col-md-2">
<a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
</div>
</div>
</div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
<div class="container-fluid">
<div class="navbar-header">
<button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
<ul class="nav navbar-nav">
<li><a href="/index.html" id="home">Home</a></li>
<li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
<li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/releases/2.0.0-SNAPSHOT/index.html">2.0.0-SNAPSHOT</a></li>
<li><a href="/releases/1.2.1/index.html">1.2.1</a></li>
<li><a href="/releases/1.1.2/index.html">1.1.2</a></li>
<li><a href="/releases/1.0.6/index.html">1.0.6</a></li>
</ul>
</li>
<li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
<li><a href="/contribute/People.html">People</a></li>
<li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
</ul>
</li>
<li><a href="/2018/02/19/storm121-released.html" id="news">News</a></li>
</ul>
</nav>
</div>
</div>
<div class="container-fluid">
<h1 class="page-title">Storm State Management</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><h1 id="state-support-in-core-storm">State support in core storm</h1>
<p>Storm core has abstractions for bolts to save and retrieve the state of its operations. There is a default in-memory
based state implementation and also a Redis backed implementation that provides state persistence.</p>
<h2 id="state-management">State management</h2>
<p>Bolts that requires its state to be managed and persisted by the framework should implement the <code>IStatefulBolt</code> interface or
extend the <code>BaseStatefulBolt</code> and implement <code>void initState(T state)</code> method. The <code>initState</code> method is invoked by the framework
during the bolt initialization with the previously saved state of the bolt. This is invoked after prepare but before the bolt starts
processing any tuples.</p>
<p>Currently the only kind of <code>State</code> implementation that is supported is <code>KeyValueState</code> which provides key-value mapping.</p>
<p>For example a word count bolt could use the key value state abstraction for the word counts as follows.</p>
<ol>
<li>Extend the BaseStatefulBolt and type parameterize it with KeyValueState which would store the mapping of word to count.</li>
<li>The bolt gets initialized with its previously saved state in the init method. This will contain the word counts
last committed by the framework during the previous run.</li>
<li><p>In the execute method, update the word count.</p>
<p>public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String, Long>&gt; {
private KeyValueState<String, Long> wordCounts;
private OutputCollector collector;
...
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void initState(KeyValueState<String, Long> state) {
wordCounts = state;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer count = wordCounts.get(word, 0);
count++;
wordCounts.put(word, count);
collector.emit(tuple, new Values(word, count));
collector.ack(tuple);
}
...
}</p></li>
<li><p>The framework periodically checkpoints the state of the bolt (default every second). The frequency
can be changed by setting the storm config <code>topology.state.checkpoint.interval.ms</code></p></li>
<li><p>For state persistence, use a state provider that supports persistence by setting the <code>topology.state.provider</code> in the
storm config. E.g. for using Redis based key-value state implementation set <code>topology.state.provider: org.apache.storm.redis.state.RedisKeyValueStateProvider</code>
in storm.yaml. The provider implementation jar should be in the class path, which in this case means putting the <code>storm-redis-*.jar</code>
in the extlib directory.</p></li>
<li><p>The state provider properties can be overridden by setting <code>topology.state.provider.config</code>. For Redis state this is a
json config with the following properties.</p></li>
</ol>
<div class="highlight"><pre><code class="language-" data-lang="">{
"keyClass": "Optional fully qualified class name of the Key type.",
"valueClass": "Optional fully qualified class name of the Value type.",
"keySerializerClass": "Optional Key serializer implementation class.",
"valueSerializerClass": "Optional Value Serializer implementation class.",
"jedisPoolConfig": {
"host": "localhost",
"port": 6379,
"timeout": 2000,
"database": 0,
"password": "xyz"
}
}
</code></pre></div>
<h2 id="checkpoint-mechanism">Checkpoint mechanism</h2>
<p>Checkpoint is triggered by an internal checkpoint spout at the specified <code>topology.state.checkpoint.interval.ms</code>. If there is
at-least one <code>IStatefulBolt</code> in the topology, the checkpoint spout is automatically added by the topology builder . For stateful topologies,
the topology builder wraps the <code>IStatefulBolt</code> in a <code>StatefulBoltExecutor</code> which handles the state commits on receiving the checkpoint tuples.
The non stateful bolts are wrapped in a <code>CheckpointTupleForwarder</code> which just forwards the checkpoint tuples so that the checkpoint tuples
can flow through the topology DAG. The checkpoint tuples flow through a separate internal stream namely <code>$checkpoint</code>. The topology builder
wires the checkpoint stream across the whole topology with the checkpoint spout at the root.</p>
<div class="highlight"><pre><code class="language-" data-lang=""> default default default
[spout1] ---------------&gt; [statefulbolt1] ----------&gt; [bolt1] --------------&gt; [statefulbolt2]
| ----------&gt; --------------&gt;
| ($chpt) ($chpt)
|
[$checkpointspout] _______| ($chpt)
</code></pre></div>
<p>At checkpoint intervals the checkpoint tuples are emitted by the checkpoint spout. On receiving a checkpoint tuple, the state of the bolt
is saved and then the checkpoint tuple is forwarded to the next component. Each bolt waits for the checkpoint to arrive on all its input
streams before it saves its state so that the state represents a consistent state across the topology. Once the checkpoint spout receives
ACK from all the bolts, the state commit is complete and the transaction is recorded as committed by the checkpoint spout.</p>
<p>The state checkpointing does not currently checkpoint the state of the spout. Yet, once the state of all bolts are checkpointed, and once the checkpoint tuples are acked, the tuples emitted by the spout are also acked.
It also implies that <code>topology.state.checkpoint.interval.ms</code> is lower than <code>topology.message.timeout.secs</code>. </p>
<p>The state commit works like a three phase commit protocol with a prepare and commit phase so that the state across the topology is saved
in a consistent and atomic manner.</p>
<h3 id="recovery">Recovery</h3>
<p>The recovery phase is triggered when the topology is started for the first time. If the previous transaction was not successfully
prepared, a <code>rollback</code> message is sent across the topology so that if a bolt has some prepared transactions it can be discarded.
If the previous transaction was prepared successfully but not committed, a <code>commit</code> message is sent across the topology so that
the prepared transactions can be committed. After these steps are complete, the bolts are initialized with the state.</p>
<p>The recovery is also triggered if one of the bolts fails to acknowledge the checkpoint message or say a worker crashed in
the middle. Thus when the worker is restarted by the supervisor, the checkpoint mechanism makes sure that the bolt gets
initialized with its previous state and the checkpointing continues from the point where it left off.</p>
<h3 id="guarantee">Guarantee</h3>
<p>Storm relies on the acking mechanism to replay tuples in case of failures. It is possible that the state is committed
but the worker crashes before acking the tuples. In this case the tuples are replayed causing duplicate state updates.
Also currently the StatefulBoltExecutor continues to process the tuples from a stream after it has received a checkpoint
tuple on one stream while waiting for checkpoint to arrive on other input streams for saving the state. This can also cause
duplicate state updates during recovery.</p>
<p>The state abstraction does not eliminate duplicate evaluations and currently provides only at-least once guarantee.</p>
<p>In order to provide the at-least once guarantee, all bolts in a stateful topology are expected to anchor the tuples while emitting and ack the input tuples once its processed. For non-stateful bolts, the anchoring/acking can be automatically managed by extending the <code>BaseBasicBolt</code>. Stateful bolts are expected to anchor tuples while emitting and ack the tuple after processing like in the <code>WordCountBolt</code> example in the State management section above.</p>
<h3 id="istateful-bolt-hooks">IStateful bolt hooks</h3>
<p>IStateful bolt interface provides hook methods where in the stateful bolts could implement some custom actions.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="cm">/**
* This is a hook for the component to perform some actions just before the
* framework commits its state.
*/</span>
<span class="kt">void</span> <span class="nf">preCommit</span><span class="o">(</span><span class="kt">long</span> <span class="n">txid</span><span class="o">);</span>
<span class="cm">/**
* This is a hook for the component to perform some actions just before the
* framework prepares its state.
*/</span>
<span class="kt">void</span> <span class="nf">prePrepare</span><span class="o">(</span><span class="kt">long</span> <span class="n">txid</span><span class="o">);</span>
<span class="cm">/**
* This is a hook for the component to perform some actions just before the
* framework rolls back the prepared state.
*/</span>
<span class="kt">void</span> <span class="nf">preRollback</span><span class="o">();</span>
</code></pre></div>
<p>This is optional and stateful bolts are not expected to provide any implementation. This is provided so that other
system level components can be built on top of the stateful abstractions where we might want to take some actions before the
stateful bolt&#39;s state is prepared, committed or rolled back.</p>
<h2 id="providing-custom-state-implementations">Providing custom state implementations</h2>
<p>Currently the only kind of <code>State</code> implementation supported is <code>KeyValueState</code> which provides key-value mapping.</p>
<p>Custom state implementations should provide implementations for the methods defined in the <code>org.apache.storm.State</code> interface.
These are the <code>void prepareCommit(long txid)</code>, <code>void commit(long txid)</code>, <code>rollback()</code> methods. <code>commit()</code> method is optional
and is useful if the bolt manages the state on its own. This is currently used only by the internal system bolts,
for e.g. the CheckpointSpout to save its state.</p>
<p><code>KeyValueState</code> implementation should also implement the methods defined in the <code>org.apache.storm.state.KeyValueState</code> interface.</p>
<h3 id="state-provider">State provider</h3>
<p>The framework instantiates the state via the corresponding <code>StateProvider</code> implementation. A custom state should also provide
a <code>StateProvider</code> implementation which can load and return the state based on the namespace. Each state belongs to a unique namespace.
The namespace is typically unique per task so that each task can have its own state. The StateProvider and the corresponding
State implementation should be available in the class path of Storm (by placing them in the extlib directory).</p>
</div>
</div>
</div>
</div>
<footer>
<div class="container-fluid">
<div class="row">
<div class="col-md-3">
<div class="footer-widget">
<h5>Meetups</h5>
<ul class="latest-news">
<li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
<li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
<li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
<li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
<li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
<li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
<!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>About Storm</h5>
<p>Storm integrates with any queueing system and any database system. Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Storm with database systems is easy.</p>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>First Look</h5>
<ul class="footer-list">
<li><a href="/releases/current/Rationale.html">Rationale</a></li>
<li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
<li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
<li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Storm project</a></li>
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>Documentation</h5>
<ul class="footer-list">
<li><a href="/releases/current/index.html">Index</a></li>
<li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
<li><a href="/releases/current/FAQ.html">FAQ</a></li>
</ul>
</div>
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-12">
<p align="center">Copyright © 2015 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved.
<br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation.
<br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
</div>
</div>
</div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span>
</body>
</html>