blob: 18b8d06e5d7bc13abd579f8f62515aa6421f6511 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<title>Trident State</title>
<!-- Bootstrap core CSS -->
<link href="/assets/css/bootstrap.min.css" rel="stylesheet">
<!-- Bootstrap theme -->
<link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
<!-- Custom styles for this template -->
<link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
<link href="/css/style.css" rel="stylesheet">
<link href="/assets/css/owl.theme.css" rel="stylesheet">
<link href="/assets/css/owl.carousel.css" rel="stylesheet">
<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
<script type="text/javascript" src="/assets/js/storm.js"></script>
<!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
<!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<header>
<div class="container-fluid">
<div class="row">
<div class="col-md-5">
<a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
</div>
<div class="col-md-5">
<h1>Version: 2.1.0</h1>
</div>
<div class="col-md-2">
<a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
</div>
</div>
</div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
<div class="container-fluid">
<div class="navbar-header">
<button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
<ul class="nav navbar-nav">
<li><a href="/index.html" id="home">Home</a></li>
<li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
<li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/releases/2.3.0/index.html">2.3.0</a></li>
<li><a href="/releases/2.2.0/index.html">2.2.0</a></li>
<li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
<li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
<li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
</ul>
</li>
<li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
<li><a href="/contribute/People.html">People</a></li>
<li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
</ul>
</li>
<li><a href="/2021/09/27/storm230-released.html" id="news">News</a></li>
</ul>
</nav>
</div>
</div>
<div class="container-fluid">
<h1 class="page-title">Trident State</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><p>Trident has first-class abstractions for reading from and writing to stateful sources. The state can either be internal to the topology – e.g., kept in-memory and backed by HDFS – or externally stored in a database like Memcached or Cassandra. There&#39;s no difference in the Trident API for either case.</p>
<p>Trident manages state in a fault-tolerant way so that state updates are idempotent in the face of retries and failures. This lets you reason about Trident topologies as if each message were processed exactly-once.</p>
<p>There&#39;s various levels of fault-tolerance possible when doing state updates. Before getting to those, let&#39;s look at an example that illustrates the tricks necessary to achieve exactly-once semantics. Suppose that you&#39;re doing a count aggregation of your stream and want to store the running count in a database. Now suppose you store in the database a single value representing the count, and every time you process a new tuple you increment the count.</p>
<p>When failures occur, tuples will be replayed. This brings up a problem when doing state updates (or anything with side effects) – you have no idea if you&#39;ve ever successfully updated the state based on this tuple before. Perhaps you never processed the tuple before, in which case you should increment the count. Perhaps you&#39;ve processed the tuple and successfully incremented the count, but the tuple failed processing in another step. In this case, you should not increment the count. Or perhaps you saw the tuple before but got an error when updating the database. In this case, you <em>should</em> update the database.</p>
<p>By just storing the count in the database, you have no idea whether or not this tuple has been processed before. So you need more information in order to make the right decision. Trident provides the following semantics which are sufficient for achieving exactly-once processing semantics:</p>
<ol>
<li>Tuples are processed as small batches (see <a href="Trident-tutorial.html">the tutorial</a>)</li>
<li>Each batch of tuples is given a unique id called the &quot;transaction id&quot; (txid). If the batch is replayed, it is given the exact same txid.</li>
<li>State updates are ordered among batches. That is, the state updates for batch 3 won&#39;t be applied until the state updates for batch 2 have succeeded.</li>
</ol>
<p>With these primitives, your State implementation can detect whether or not the batch of tuples has been processed before and take the appropriate action to update the state in a consistent way. The action you take depends on the exact semantics provided by your input spouts as to what&#39;s in each batch. There&#39;s three kinds of spouts possible with respect to fault-tolerance: &quot;non-transactional&quot;, &quot;transactional&quot;, and &quot;opaque transactional&quot;. Likewise, there&#39;s three kinds of state possible with respect to fault-tolerance: &quot;non-transactional&quot;, &quot;transactional&quot;, and &quot;opaque transactional&quot;. Let&#39;s take a look at each spout type and see what kind of fault-tolerance you can achieve with each.</p>
<h2 id="transactional-spouts">Transactional spouts</h2>
<p>Remember, Trident processes tuples as small batches with each batch being given a unique transaction id. The properties of spouts vary according to the guarantees they can provide as to what&#39;s in each batch. A transactional spout has the following properties:</p>
<ol>
<li>Batches for a given txid are always the same. Replays of batches for a txid will exact same set of tuples as the first time that batch was emitted for that txid.</li>
<li>There&#39;s no overlap between batches of tuples (tuples are in one batch or another, never multiple).</li>
<li>Every tuple is in a batch (no tuples are skipped)</li>
</ol>
<p>This is a pretty easy type of spout to understand, the stream is divided into fixed batches that never change. Storm has <a href="http://github.com/apache/storm/tree/v2.1.0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional">an implementation of a transactional spout</a> for Kafka.</p>
<p>You might be wondering – why wouldn&#39;t you just always use a transactional spout? They&#39;re simple and easy to understand. One reason you might not use one is because they&#39;re not necessarily very fault-tolerant. For example, the way TransactionalTridentKafkaSpout works is the batch for a txid will contain tuples from all the Kafka partitions for a topic. Once a batch has been emitted, any time that batch is re-emitted in the future the exact same set of tuples must be emitted to meet the semantics of transactional spouts. Now suppose a batch is emitted from TransactionalTridentKafkaSpout, the batch fails to process, and at the same time one of the Kafka nodes goes down. You&#39;re now incapable of replaying the same batch as you did before (since the node is down and some partitions for the topic are not unavailable), and processing will halt. </p>
<p>This is why &quot;opaque transactional&quot; spouts exist – they are fault-tolerant to losing source nodes while still allowing you to achieve exactly-once processing semantics. We&#39;ll cover those spouts in the next section though.</p>
<p>(One side note – once Kafka supports replication, it will be possible to have transactional spouts that are fault-tolerant to node failure, but that feature does not exist yet.)</p>
<p>Before we get to &quot;opaque transactional&quot; spouts, let&#39;s look at how you would design a State implementation that has exactly-once semantics for transactional spouts. This State type is called a &quot;transactional state&quot; and takes advantage of the fact that any given txid is always associated with the exact same set of tuples.</p>
<p>Suppose your topology computes word count and you want to store the word counts in a key/value database. The key will be the word, and the value will contain the count. You&#39;ve already seen that storing just the count as the value isn&#39;t sufficient to know whether you&#39;ve processed a batch of tuples before. Instead, what you can do is store the transaction id with the count in the database as an atomic value. Then, when updating the count, you can just compare the transaction id in the database with the transaction id for the current batch. If they&#39;re the same, you skip the update – because of the strong ordering, you know for sure that the value in the database incorporates the current batch. If they&#39;re different, you increment the count. This logic works because the batch for a txid never changes, and Trident ensures that state updates are ordered among batches.</p>
<p>Consider this example of why it works. Suppose you are processing txid 3 which consists of the following batch of tuples:</p>
<div class="highlight"><pre><code class="language-" data-lang="">["man"]
["man"]
["dog"]
</code></pre></div>
<p>Suppose the database currently holds the following key/value pairs:</p>
<div class="highlight"><pre><code class="language-" data-lang="">man =&gt; [count=3, txid=1]
dog =&gt; [count=4, txid=3]
apple =&gt; [count=10, txid=2]
</code></pre></div>
<p>The txid associated with &quot;man&quot; is txid 1. Since the current txid is 3, you know for sure that this batch of tuples is not represented in that count. So you can go ahead and increment the count by 2 and update the txid. On the other hand, the txid for &quot;dog&quot; is the same as the current txid. So you know for sure that the increment from the current batch is already represented in the database for the &quot;dog&quot; key. So you can skip the update. After completing updates, the database looks like this:</p>
<div class="highlight"><pre><code class="language-" data-lang="">man =&gt; [count=5, txid=3]
dog =&gt; [count=4, txid=3]
apple =&gt; [count=10, txid=2]
</code></pre></div>
<p>Let&#39;s now look at opaque transactional spouts and how to design states for that type of spout.</p>
<h2 id="opaque-transactional-spouts">Opaque transactional spouts</h2>
<p>As described before, an opaque transactional spout cannot guarantee that the batch of tuples for a txid remains constant. An opaque transactional spout has the following property:</p>
<ol>
<li>Every tuple is <em>successfully</em> processed in exactly one batch. However, it&#39;s possible for a tuple to fail to process in one batch and then succeed to process in a later batch.</li>
</ol>
<p><a href="http://github.com/apache/storm/tree/v2.1.0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java">KafkaTridentSpoutOpaque</a> is a spout that has this property and is fault-tolerant to losing Kafka nodes. Whenever it&#39;s time for KafkaTridentSpoutOpaque to emit a batch, it emits tuples starting from where the last batch finished emitting. This ensures that no tuple is ever skipped or successfully processed by multiple batches.</p>
<p>With opaque transactional spouts, it&#39;s no longer possible to use the trick of skipping state updates if the transaction id in the database is the same as the transaction id for the current batch. This is because the batch may have changed between state updates.</p>
<p>What you can do is store more state in the database. Rather than store a value and transaction id in the database, you instead store a value, transaction id, and the previous value in the database. Let&#39;s again use the example of storing a count in the database. Suppose the partial count for your batch is &quot;2&quot; and it&#39;s time to apply a state update. Suppose the value in the database looks like this:</p>
<div class="highlight"><pre><code class="language-" data-lang="">{ value = 4,
prevValue = 1,
txid = 2
}
</code></pre></div>
<p>Suppose your current txid is 3, different than what&#39;s in the database. In this case, you set &quot;prevValue&quot; equal to &quot;value&quot;, increment &quot;value&quot; by your partial count, and update the txid. The new database value will look like this:</p>
<div class="highlight"><pre><code class="language-" data-lang="">{ value = 6,
prevValue = 4,
txid = 3
}
</code></pre></div>
<p>Now suppose your current txid is 2, equal to what&#39;s in the database. Now you know that the &quot;value&quot; in the database contains an update from a previous batch for your current txid, but that batch may have been different so you have to ignore it. What you do in this case is increment &quot;prevValue&quot; by your partial count to compute the new &quot;value&quot;. You then set the value in the database to this:</p>
<div class="highlight"><pre><code class="language-" data-lang="">{ value = 3,
prevValue = 1,
txid = 2
}
</code></pre></div>
<p>This works because of the strong ordering of batches provided by Trident. Once Trident moves onto a new batch for state updates, it will never go back to a previous batch. And since opaque transactional spouts guarantee no overlap between batches – that each tuple is successfully processed by one batch – you can safely update based on the previous value.</p>
<h2 id="non-transactional-spouts">Non-transactional spouts</h2>
<p>Non-transactional spouts don&#39;t provide any guarantees about what&#39;s in each batch. So it might have at-most-once processing, in which case tuples are not retried after failed batches. Or it might have at-least-once processing, where tuples can be processed successfully by multiple batches. There&#39;s no way to achieve exactly-once semantics for this kind of spout.</p>
<h2 id="summary-of-spout-and-state-types">Summary of spout and state types</h2>
<p>This diagram shows which combinations of spouts / states enable exactly-once messaging semantics:</p>
<p><img src="images/spout-vs-state.png" alt="Spouts vs States"></p>
<p>Opaque transactional states have the strongest fault-tolerance, but this comes at the cost of needing to store the txid and two values in the database. Transactional states require less state in the database, but only work with transactional spouts. Finally, non-transactional states require the least state in the database but cannot achieve exactly-once semantics.</p>
<p>The state and spout types you choose are a tradeoff between fault-tolerance and storage costs, and ultimately your application requirements will determine which combination is right for you.</p>
<h2 id="state-apis">State APIs</h2>
<p>You&#39;ve seen the intricacies of what it takes to achieve exactly-once semantics. The nice thing about Trident is that it internalizes all the fault-tolerance logic within the State – as a user you don&#39;t have to deal with comparing txids, storing multiple values in the database, or anything like that. You can write code like this:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TridentTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentTopology</span><span class="o">();</span>
<span class="n">TridentState</span> <span class="n">wordCounts</span> <span class="o">=</span>
<span class="n">topology</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="s">"spout1"</span><span class="o">,</span> <span class="n">spout</span><span class="o">)</span>
<span class="o">.</span><span class="na">each</span><span class="o">(</span><span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"sentence"</span><span class="o">),</span> <span class="k">new</span> <span class="n">Split</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"word"</span><span class="o">))</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"word"</span><span class="o">))</span>
<span class="o">.</span><span class="na">persistentAggregate</span><span class="o">(</span><span class="n">MemcachedState</span><span class="o">.</span><span class="na">opaque</span><span class="o">(</span><span class="n">serverLocations</span><span class="o">),</span> <span class="k">new</span> <span class="n">Count</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"count"</span><span class="o">))</span>
<span class="o">.</span><span class="na">parallelismHint</span><span class="o">(</span><span class="mi">6</span><span class="o">);</span>
</code></pre></div>
<p>All the logic necessary to manage opaque transactional state logic is internalized in the MemcachedState.opaque call. Additionally, updates are automatically batched to minimize roundtrips to the database.</p>
<p>The base State interface just has two methods:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">State</span> <span class="o">{</span>
<span class="kt">void</span> <span class="nf">beginCommit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">);</span> <span class="c1">// can be null for things like partitionPersist occurring off a DRPC stream</span>
<span class="kt">void</span> <span class="nf">commit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">);</span>
<span class="o">}</span>
</code></pre></div>
<p>You&#39;re told when a state update is beginning, when a state update is ending, and you&#39;re given the txid in each case. Trident assumes nothing about how your state works, what kind of methods there are to update it, and what kind of methods there are to read from it.</p>
<p>Suppose you have a home-grown database that contains user location information and you want to be able to access it from Trident. Your State implementation would have methods for getting and setting user information:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">LocationDB</span> <span class="kd">implements</span> <span class="n">State</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">beginCommit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">)</span> <span class="o">{</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">commit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">)</span> <span class="o">{</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">setLocation</span><span class="o">(</span><span class="kt">long</span> <span class="n">userId</span><span class="o">,</span> <span class="n">String</span> <span class="n">location</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// code to access database and set location</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="n">String</span> <span class="nf">getLocation</span><span class="o">(</span><span class="kt">long</span> <span class="n">userId</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// code to get location from database</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<p>You then provide Trident a StateFactory that can create instances of your State object within Trident tasks. The StateFactory for your LocationDB might look something like this:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">LocationDBFactory</span> <span class="kd">implements</span> <span class="n">StateFactory</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">State</span> <span class="nf">makeState</span><span class="o">(</span><span class="n">Map</span> <span class="n">conf</span><span class="o">,</span> <span class="kt">int</span> <span class="n">partitionIndex</span><span class="o">,</span> <span class="kt">int</span> <span class="n">numPartitions</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="k">new</span> <span class="nf">LocationDB</span><span class="o">();</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<p>Trident provides the QueryFunction interface for writing Trident operations that query a source of state, and the StateUpdater interface for writing Trident operations that update a source of state. For example, let&#39;s write an operation &quot;QueryLocation&quot; that queries the LocationDB for the locations of users. Let&#39;s start off with how you would use it in a topology. Let&#39;s say this topology consumes an input stream of userids:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TridentTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentTopology</span><span class="o">();</span>
<span class="n">TridentState</span> <span class="n">locations</span> <span class="o">=</span> <span class="n">topology</span><span class="o">.</span><span class="na">newStaticState</span><span class="o">(</span><span class="k">new</span> <span class="n">LocationDBFactory</span><span class="o">());</span>
<span class="n">topology</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="s">"myspout"</span><span class="o">,</span> <span class="n">spout</span><span class="o">)</span>
<span class="o">.</span><span class="na">stateQuery</span><span class="o">(</span><span class="n">locations</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"userid"</span><span class="o">),</span> <span class="k">new</span> <span class="n">QueryLocation</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"location"</span><span class="o">))</span>
</code></pre></div>
<p>Now let&#39;s take a look at what the implementation of QueryLocation would look like:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">QueryLocation</span> <span class="kd">extends</span> <span class="n">BaseQueryFunction</span><span class="o">&lt;</span><span class="n">LocationDB</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="nf">batchRetrieve</span><span class="o">(</span><span class="n">LocationDB</span> <span class="n">state</span><span class="o">,</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">TridentTuple</span><span class="o">&gt;</span> <span class="n">inputs</span><span class="o">)</span> <span class="o">{</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">ret</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">();</span>
<span class="k">for</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="nl">input:</span> <span class="n">inputs</span><span class="o">)</span> <span class="o">{</span>
<span class="n">ret</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">state</span><span class="o">.</span><span class="na">getLocation</span><span class="o">(</span><span class="n">input</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)));</span>
<span class="o">}</span>
<span class="k">return</span> <span class="n">ret</span><span class="o">;</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="n">String</span> <span class="n">location</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span>
<span class="n">collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">location</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<p>QueryFunction&#39;s execute in two steps. First, Trident collects a batch of reads together and passes them to batchRetrieve. In this case, batchRetrieve will receive multiple user ids. batchRetrieve is expected to return a list of results that&#39;s the same size as the list of input tuples. The first element of the result list corresponds to the result for the first input tuple, the second is the result for the second input tuple, and so on.</p>
<p>You can see that this code doesn&#39;t take advantage of the batching that Trident does, since it just queries the LocationDB one at a time. So a better way to write the LocationDB would be like this:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">LocationDB</span> <span class="kd">implements</span> <span class="n">State</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">beginCommit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">)</span> <span class="o">{</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">commit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">)</span> <span class="o">{</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">setLocationsBulk</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">userIds</span><span class="o">,</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">locations</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// set locations in bulk</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="nf">bulkGetLocations</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">userIds</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// get locations in bulk</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<p>Then, you can write the QueryLocation function like this:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">QueryLocation</span> <span class="kd">extends</span> <span class="n">BaseQueryFunction</span><span class="o">&lt;</span><span class="n">LocationDB</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="nf">batchRetrieve</span><span class="o">(</span><span class="n">LocationDB</span> <span class="n">state</span><span class="o">,</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">TridentTuple</span><span class="o">&gt;</span> <span class="n">inputs</span><span class="o">)</span> <span class="o">{</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">userIds</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;();</span>
<span class="k">for</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="nl">input:</span> <span class="n">inputs</span><span class="o">)</span> <span class="o">{</span>
<span class="n">userIds</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">input</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">));</span>
<span class="o">}</span>
<span class="k">return</span> <span class="n">state</span><span class="o">.</span><span class="na">bulkGetLocations</span><span class="o">(</span><span class="n">userIds</span><span class="o">);</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="n">String</span> <span class="n">location</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span>
<span class="n">collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">location</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<p>This code will be much more efficient by reducing roundtrips to the database. </p>
<p>To update state, you make use of the StateUpdater interface. Here&#39;s a StateUpdater that updates a LocationDB with new location information:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">LocationUpdater</span> <span class="kd">extends</span> <span class="n">BaseStateUpdater</span><span class="o">&lt;</span><span class="n">LocationDB</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">updateState</span><span class="o">(</span><span class="n">LocationDB</span> <span class="n">state</span><span class="o">,</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">TridentTuple</span><span class="o">&gt;</span> <span class="n">tuples</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">ids</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;();</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">locations</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;();</span>
<span class="k">for</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="nl">t:</span> <span class="n">tuples</span><span class="o">)</span> <span class="o">{</span>
<span class="n">ids</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">t</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">));</span>
<span class="n">locations</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">t</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="mi">1</span><span class="o">));</span>
<span class="o">}</span>
<span class="n">state</span><span class="o">.</span><span class="na">setLocationsBulk</span><span class="o">(</span><span class="n">ids</span><span class="o">,</span> <span class="n">locations</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<p>Here&#39;s how you would use this operation in a Trident topology:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TridentTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentTopology</span><span class="o">();</span>
<span class="n">TridentState</span> <span class="n">locations</span> <span class="o">=</span>
<span class="n">topology</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="s">"locations"</span><span class="o">,</span> <span class="n">locationsSpout</span><span class="o">)</span>
<span class="o">.</span><span class="na">partitionPersist</span><span class="o">(</span><span class="k">new</span> <span class="n">LocationDBFactory</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"userid"</span><span class="o">,</span> <span class="s">"location"</span><span class="o">),</span> <span class="k">new</span> <span class="n">LocationUpdater</span><span class="o">())</span>
</code></pre></div>
<p>The partitionPersist operation updates a source of state. The StateUpdater receives the State and a batch of tuples with updates to that State. This code just grabs the userids and locations from the input tuples and does a bulk set into the State. </p>
<p>partitionPersist returns a TridentState object representing the location db being updated by the Trident topology. You could then use this state in stateQuery operations elsewhere in the topology. </p>
<p>You can also see that StateUpdaters are given a TridentCollector. Tuples emitted to this collector go to the &quot;new values stream&quot;. In this case, there&#39;s nothing interesting to emit to that stream, but if you were doing something like updating counts in a database, you could emit the updated counts to that stream. You can then get access to the new values stream for further processing via the TridentState#newValuesStream method.</p>
<h2 id="persistentaggregate">persistentAggregate</h2>
<p>Trident has another method for updating States called persistentAggregate. You&#39;ve seen this used in the streaming word count example, shown again below:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TridentTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentTopology</span><span class="o">();</span>
<span class="n">TridentState</span> <span class="n">wordCounts</span> <span class="o">=</span>
<span class="n">topology</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="s">"spout1"</span><span class="o">,</span> <span class="n">spout</span><span class="o">)</span>
<span class="o">.</span><span class="na">each</span><span class="o">(</span><span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"sentence"</span><span class="o">),</span> <span class="k">new</span> <span class="n">Split</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"word"</span><span class="o">))</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"word"</span><span class="o">))</span>
<span class="o">.</span><span class="na">persistentAggregate</span><span class="o">(</span><span class="k">new</span> <span class="n">MemoryMapState</span><span class="o">.</span><span class="na">Factory</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Count</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"count"</span><span class="o">))</span>
</code></pre></div>
<p>persistentAggregate is an additional abstraction built on top of partitionPersist that knows how to take a Trident aggregator and use it to apply updates to the source of state. In this case, since this is a grouped stream, Trident expects the state you provide to implement the &quot;MapState&quot; interface. The grouping fields will be the keys in the state, and the aggregation result will be the values in the state. The &quot;MapState&quot; interface looks like this:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">MapState</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="kd">extends</span> <span class="n">State</span> <span class="o">{</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="nf">multiGet</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">&gt;&gt;</span> <span class="n">keys</span><span class="o">);</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="nf">multiUpdate</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">&gt;&gt;</span> <span class="n">keys</span><span class="o">,</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">ValueUpdater</span><span class="o">&gt;</span> <span class="n">updaters</span><span class="o">);</span>
<span class="kt">void</span> <span class="nf">multiPut</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">&gt;&gt;</span> <span class="n">keys</span><span class="o">,</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">vals</span><span class="o">);</span>
<span class="o">}</span>
</code></pre></div>
<p>When you do aggregations on non-grouped streams (a global aggregation), Trident expects your State object to implement the &quot;Snapshottable&quot; interface:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">Snapshottable</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="kd">extends</span> <span class="n">State</span> <span class="o">{</span>
<span class="n">T</span> <span class="nf">get</span><span class="o">();</span>
<span class="n">T</span> <span class="nf">update</span><span class="o">(</span><span class="n">ValueUpdater</span> <span class="n">updater</span><span class="o">);</span>
<span class="kt">void</span> <span class="nf">set</span><span class="o">(</span><span class="n">T</span> <span class="n">o</span><span class="o">);</span>
<span class="o">}</span>
</code></pre></div>
<p><a href="http://github.com/apache/storm/blob/v2.1.0/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryMapState.java">MemoryMapState</a> and <a href="https://github.com/nathanmarz/trident-memcached/blob/2.1.0/src/jvm/trident/memcached/MemcachedState.java">MemcachedState</a> each implement both of these interfaces.</p>
<h2 id="implementing-map-states">Implementing Map States</h2>
<p>Trident makes it easy to implement MapState&#39;s, doing almost all the work for you. The OpaqueMap, TransactionalMap, and NonTransactionalMap classes implement all the logic for doing the respective fault-tolerance logic. You simply provide these classes with an IBackingMap implementation that knows how to do multiGets and multiPuts of the respective key/values. IBackingMap looks like this:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">IBackingMap</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="nf">multiGet</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">&gt;&gt;</span> <span class="n">keys</span><span class="o">);</span>
<span class="kt">void</span> <span class="nf">multiPut</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">&gt;&gt;</span> <span class="n">keys</span><span class="o">,</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">vals</span><span class="o">);</span>
<span class="o">}</span>
</code></pre></div>
<p>OpaqueMap&#39;s will call multiPut with <a href="http://github.com/apache/storm/blob/v2.1.0/storm-client/src/jvm/org/apache/storm/trident/state/OpaqueValue.java">OpaqueValue</a>&#39;s for the vals, TransactionalMap&#39;s will give <a href="http://github.com/apache/storm/blob/v2.1.0/storm-core/src/jvm/org/apache/storm/trident/state/TransactionalValue.java">TransactionalValue</a>&#39;s for the vals, and NonTransactionalMaps will just pass the objects from the topology through.</p>
<p>Trident also provides the <a href="http://github.com/apache/storm/blob/v2.1.0/storm-client/src/jvm/org/apache/storm/trident/state/map/CachedMap.java">CachedMap</a> class to do automatic LRU caching of map key/vals.</p>
<p>Finally, Trident provides the <a href="http://github.com/apache/storm/blob/v2.1.0/storm-client/src/jvm/org/apache/storm/trident/state/map/SnapshottableMap.java">SnapshottableMap</a> class that turns a MapState into a Snapshottable object, by storing global aggregations into a fixed key.</p>
<p>Take a look at the implementation of <a href="https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java">MemcachedState</a> to see how all these utilities can be put together to make a high performance MapState implementation. MemcachedState allows you to choose between opaque transactional, transactional, and non-transactional semantics.</p>
</div>
</div>
</div>
</div>
<footer>
<div class="container-fluid">
<div class="row">
<div class="col-md-3">
<div class="footer-widget">
<h5>Meetups</h5>
<ul class="latest-news">
<li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
<li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
<li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
<li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
<li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
<li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
<!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>About Apache Storm</h5>
<p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>First Look</h5>
<ul class="footer-list">
<li><a href="/releases/current/Rationale.html">Rationale</a></li>
<li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
<li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
<li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>Documentation</h5>
<ul class="footer-list">
<li><a href="/releases/current/index.html">Index</a></li>
<li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
<li><a href="/releases/current/FAQ.html">FAQ</a></li>
</ul>
</div>
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-12">
<p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved.
<br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation.
<br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
</div>
</div>
</div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span>
</body>
</html>