blob: 6b6800234622280c54a2947ab1e13c6d129937fd [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Effectively Once Java Topologies · Apache Heron</title><meta name="viewport" content="width=device-width"/><meta name="generator" content="Docusaurus"/><meta name="description" content="&lt;!--"/><meta name="docsearch:version" content="0.20.1-incubating"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Effectively Once Java Topologies · Apache Heron"/><meta property="og:type" content="website"/><meta property="og:url" content="https://heron.apache.org/"/><meta property="og:description" content="&lt;!--"/><meta property="og:image" content="https://heron.apache.org/img/undraw_online.svg"/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://heron.apache.org/img/undraw_tweetstorm.svg"/><link rel="shortcut icon" href="/img/favicon-32x32.png"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/default.min.css"/><link rel="alternate" type="application/atom+xml" href="https://heron.apache.org/blog/atom.xml" title="Apache Heron Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://heron.apache.org/blog/feed.xml" title="Apache Heron Blog RSS Feed"/><script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','https://www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-198017384-1', 'auto');
ga('send', 'pageview');
</script><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="/js/custom.js"></script><script type="text/javascript" src="/js/fix-location.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/"><img class="logo" src="/img/HeronTextLogo-small.png" alt="Apache Heron"/><h2 class="headerTitleWithLogo">Apache Heron</h2></a><a href="/versions"><h3>0.20.1-incubating</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class=""><a href="/api/java" target="_self">Javadocs</a></li><li class=""><a href="/api/python" target="_self">Pydocs</a></li><li class="siteNavGroupActive"><a href="/docs/0.20.1-incubating/getting-started-local-single-node" target="_self">Docs</a></li><li class=""><a href="/download" target="_self">Downloads</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#apache" target="_self">Apache</a></li></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="container docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Guides</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Getting Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/getting-started-local-single-node">Local (Single Node)</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/getting-started-migrate-storm-topologies">Migrate Storm Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/getting-started-troubleshooting-guide">Troubleshooting Guide</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/deployment-overview">Deployment Overiew</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/deployment-configuration">Configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/deployment-api-server">The Heron API Server</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Topology Development APIs</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/topology-development-streamlet-api">The Heron Streamlet API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/topology-development-eco-api">The ECO API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/topology-development-topology-api-java">The Heron Topology API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/topology-development-topology-api-python">The Heron Topology API for Python</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/topology-development-streamlet-scala">The Heron Streamlet API for Scala</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client API Docs</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/client-api-docs-overview">Client API Docs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Guides</h3><ul class=""><li class="navListItem navListItemActive"><a class="navItem" href="/docs/0.20.1-incubating/guides-effectively-once-java-topologies">Effectively Once Java Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-data-model">Heron Data Model</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-tuple-serialization">Tuple Serialization</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-ui-guide">Heron UI Guide</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-topology-tuning">Topology Tuning Guide</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-packing-algorithms">Packing Algorithms</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-simulator-mode">Simulator Mode</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-troubeshooting-guide">Topology Troubleshooting Guide</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Heron Concepts</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/heron-design-goals">Heron Design Goals</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/heron-topology-concepts">Heron Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/heron-streamlet-concepts">Heron Streamlets</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/heron-architecture">Heron Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/heron-delivery-semantics">Heron Delivery Semantics</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">State Managers</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/state-managers-zookeeper">Zookeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/state-managers-local-fs">Local File System</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Uploaders</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/uploaders-local-fs">Local File System</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/uploaders-hdfs">HDFS</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/uploaders-http">HTTP</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/uploaders-amazon-s3">Amazon S3</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/uploaders-scp">Secure Copy (SCP)</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Schedulers</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-k8s-by-hand">Kubernetes by hand</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-k8s-with-helm">Kubernetes with Helm</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-aurora-cluster">Aurora Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-aurora-local">Aurora Locally</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-local">Local Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-nomad">Nomad</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-mesos-local-mac">Mesos Cluster Locally</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-slurm">Slurm Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-yarn">YARN Cluster</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cluster Configuration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/cluster-config-overview">Cluster Config Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/cluster-config-system-level">System Level Configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/cluster-config-instance">Heron Instance</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/cluster-config-metrics">Metrics Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/cluster-config-stream">Stream Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/cluster-config-tmanager">Topology Manager</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Observability</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/observability-prometheus">Prometheus</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/observability-graphite">Graphite</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/observability-scribe">Scribe</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">User Manuals</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/user-manuals-heron-cli">Heron Client</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/user-manuals-heron-explorer">Heron Explorer</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/user-manuals-tracker-rest">Heron Tracker REST API</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/user-manuals-heron-tracker-runbook">Heron Tracker Runbook</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/user-manuals-heron-ui-runbook">Heron UI Runbook</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/user-manuals-heron-shell">Heron Shell</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Compiling</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/compiling-overview">Compiling Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/compiling-linux">Compiling on Linux</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/compiling-osx">Compiling on OS X</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/compiling-docker">Compiling With Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/compiling-running-tests">Running Tests</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/compiling-code-organization">Code Organization</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Extending Heron</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/extending-heron-scheduler">Custom Scheduler</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/extending-heron-metric-sink">Custom Metrics Sink</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Heron Resources</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/heron-resources-resources">Heron Resources</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer"><div class="wrapper"><div class="post"><header class="postHeader"><h1 class="postHeaderTitle">Effectively Once Java Topologies</h1></header><article><div><span><!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<blockquote>
<p><strong>This document pertains to the older, Storm-based, Heron Topology API.</strong> Heron now offers several APIs for building topologies. Topologies created using the Topology API can still run on Heron and there are currently no plans to deprecate this API. We would, however, recommend that you use the Streamlet API for future work.</p>
</blockquote>
<p>You can create Heron topologies that have <a href="heron-delivery-semantics#stateful-topologies">effectively-once</a> semantics by doing two things:</p>
<ol>
<li>Set the <a href="#specifying-delivery-semantics">delivery semantics</a> of the topology to <code>EFFECTIVELY_ONCE</code>.</li>
<li>Create topology processing logic in which each component (i.e. each spout and bolt) implements the <a href="/api/java/org/apache/heron/api/topology/IStatefulComponent.html"><code>IStatefulComponent</code></a> interface.</li>
</ol>
<h2><a class="anchor" aria-hidden="true" id="specifying-delivery-semantics"></a><a href="#specifying-delivery-semantics" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Specifying delivery semantics</h2>
<p>You can specify the <a href="heron-delivery-semantics">delivery semantics</a> of a Heron topology via configuration. To apply effectively-once semantics to a topology:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.heron.api.Config;
Config topologyConfig = <span class="hljs-keyword">new</span> Config();
topologyConfig.setTopologyReliabilityMode(Config.TopologyReliabilityMode.ATLEAST_ONCE);
</code></pre>
<p>The other possible values for the <code>TopologyReliabilityMode</code> enum are <code>ATMOST_ONCE</code> and <code>EFFECTIVELY_ONCE</code>.</p>
<blockquote>
<p>Instead of &quot;delivery semantics&quot; terminology, the original Topology API for Heron uses &quot;reliability mode&quot; terminology. In spite of the terminological difference, the two sets of terms are synonymous.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="stateful-components"></a><a href="#stateful-components" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Stateful components</h2>
<p>Stateful spouts and bolts need to implement the <a href="/api/java/org/apache/heron/api/topology/IStatefulComponent.html"><code>IStatefulComponent</code></a> interface, which requires implementing two methods (both of which are <code>void</code> methods):</p>
<table>
<thead>
<tr><th style="text-align:left">Method</th><th style="text-align:left">Input</th><th style="text-align:left">Description</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>preSave</code></td><td style="text-align:left">Checkpoint ID (<code>String</code>)</td><td style="text-align:left">The action taken immediately prior to the component's state being saved.</td></tr>
<tr><td style="text-align:left"><code>initState</code></td><td style="text-align:left">Initial state (<a href="/api/java/org/apache/heron/examples/api/StatefulWordCountTopology.ConsumerBolt.html#initState-org.apache.heron.api.state.State-"><code>State&lt;K, V&gt;</code></a>)</td><td style="text-align:left">Initializes the state of the function or operator to that of a previous checkpoint.</td></tr>
</tbody>
</table>
<blockquote>
<p>Remember that stateful components automatically handle all state storage in the background using a State Manager (the currently available State Managers are <a href="state-managers-zookeeper">ZooKeeper</a> and the <a href="state-managers-local-fs">local filesystem</a>. You don't need to, for example, save state to an external database.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="the-state-class"></a><a href="#the-state-class" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>The <code>State</code> class</h2>
<p>Heron topologies with effectively-once semantics need to be stateful topologies (you can also create stateful topologies with at-least-once or at-most-once semantics). All state in stateful topologies is handled through a <a href="/api/java/org/apache/heron/api/state/State.html"><code>State</code></a> class which has the same semantics as a standard Java <a href="https://docs.oracle.com/javase/8/docs/api/java/util/Map.html"><code>Map</code></a>, and so it includes methods like <code>get</code>, <code>set</code>, <code>put</code>, <code>putIfAbsent</code>, <code>keySet</code>, <code>compute</code>, <code>forEach</code>, <code>merge</code>, and so on.</p>
<p>Each stateful spout or bolt must be associated with a single <code>State</code> object that handles the state, and that object must also be typed as <code>State&lt;K, V&gt;</code>, for example <code>State&lt;String, Integer&gt;</code>, <code>State&lt;long, MyPojo&gt;</code>, etc. An example usage of the state object can be found in the <a href="#example-effectively-once-topology">example topology</a> below.</p>
<h2><a class="anchor" aria-hidden="true" id="example-effectively-once-topology"></a><a href="#example-effectively-once-topology" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example effectively-once topology</h2>
<p>In the sections below, we'll build a stateful topology with effectively-once semantics from scratch. The topology will work like this:</p>
<ul>
<li>A <a href="#example-stateful-spout"><code>RandomIntSpout</code></a> will continuously emit random integers between 1 and 100</li>
<li>An <a href="#example-stateful-bolt"><code>AdditionBolt</code></a> will receive those random numbers and add each number to a running sum. When the sum reaches 1,000,000, it will go back to zero. The bolt won't emit any data but will simply log the current sum.</li>
</ul>
<blockquote>
<p>You can see the code for another stateful Heron topology with effectively-once semantics in <a href="https://github.com/apache/incubator-heron/blob/master/examples/src/java/org/apache/heron/examples/api/StatefulWordCountTopology.java">this word count example</a>.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="example-stateful-spout"></a><a href="#example-stateful-spout" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example stateful spout</h3>
<p>The <code>RandomIntSpout</code> shown below continuously emits a never-ending series of random integers between 1 and 100 in the <code>random-int</code> field.</p>
<blockquote>
<p>It's important to note that <em>all</em> components in stateful topologies must be stateful (i.e. implement the <code>IStatefulComponent</code> interface) for the topology to provide effectively-once semantics. That includes spouts, even simple ones like the spout in this example.</p>
</blockquote>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.heron.api.spout.BaseRichSpout;
<span class="hljs-keyword">import</span> org.apache.heron.api.spout.SpoutOutputCollector;
<span class="hljs-keyword">import</span> org.apache.heron.api.state.State;
<span class="hljs-keyword">import</span> org.apache.heron.api.topology.IStatefulComponent;
<span class="hljs-keyword">import</span> org.apache.heron.api.topology.TopologyContext;
<span class="hljs-keyword">import</span> org.apache.heron.api.tuple.Fields;
<span class="hljs-keyword">import</span> org.apache.heron.api.tuple.Values;
<span class="hljs-keyword">import</span> java.util.Map;
<span class="hljs-keyword">import</span> java.util.concurrent.ThreadLocalRandom;
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">RandomIntSpout</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">BaseRichSpout</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">IStatefulComponent</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Integer</span>&gt; </span>{
<span class="hljs-keyword">private</span> SpoutOutputCollector spoutOutputCollector;
<span class="hljs-keyword">private</span> State&lt;String, Integer&gt; count;
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">RandomIntSpout</span><span class="hljs-params">()</span> </span>{
}
<span class="hljs-comment">// Generates a random integer between 1 and 100</span>
<span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">int</span> <span class="hljs-title">randomInt</span><span class="hljs-params">()</span> </span>{
<span class="hljs-keyword">return</span> ThreadLocalRandom.current().nextInt(<span class="hljs-number">1</span>, <span class="hljs-number">101</span>);
}
<span class="hljs-comment">// These two methods are required to implement the IStatefulComponent interface</span>
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">preSave</span><span class="hljs-params">(String checkpointId)</span> </span>{
System.out.println(String.format(<span class="hljs-string">"Saving spout state at checkpoint %s"</span>, checkpointId));
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">initState</span><span class="hljs-params">(State&lt;String, Integer&gt; state)</span> </span>{
count = state;
}
<span class="hljs-comment">// These three methods are required to extend the BaseRichSpout abstract class</span>
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">open</span><span class="hljs-params">(Map&lt;String, Object&gt; map, TopologyContext ctx, SpoutOutputCollector collector)</span> </span>{
spoutOutputCollector = collector;
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">declareOutputFields</span><span class="hljs-params">(OutputFieldsDeclarer declarer)</span> </span>{
declarer.declare(<span class="hljs-keyword">new</span> Fields(<span class="hljs-string">"random-int"</span>));
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">nextTuple</span><span class="hljs-params">()</span> </span>{
<span class="hljs-keyword">int</span> randomInt = randomInt();
collector.emit(<span class="hljs-keyword">new</span> Values(randomInt));
}
}
</code></pre>
<p>A few things to note in this spout:</p>
<ul>
<li>All state is handled by the <code>count</code> variable, which is of type <code>State&lt;String, Integer&gt;</code>. In that state object, the key is always <code>count</code>, while the value is the current sum.</li>
<li>This is a very simple topology, so the <code>preSave</code> method simply logs the current checkpoint ID. This method could be used in a variety of more complex ways.</li>
<li>The <code>initState</code> method simply accepts the current state as-is. This method can be used for a wide variety of purposes, for example deserializing the <code>State</code> object to a user-defined type.</li>
<li>Only one field will be declared: the <code>random-int</code> field.</li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="example-stateful-bolt"></a><a href="#example-stateful-bolt" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example stateful bolt</h3>
<p>The <code>AdditionBolt</code> takes incoming tuples from the <code>RandomIntSpout</code> and adds each integer to produce a running sum. If the sum ever exceeds 1 million, then it resets to zero.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.heron.api.bolt.BaseRichBolt;
<span class="hljs-keyword">import</span> org.apache.heron.api.bolt.OutputCollector;
<span class="hljs-keyword">import</span> org.apache.heron.api.state.State;
<span class="hljs-keyword">import</span> org.apache.heron.api.topology.IStatefulComponent;
<span class="hljs-keyword">import</span> org.apache.heron.api.topology.TopologyContext;
<span class="hljs-keyword">import</span> java.util.Map;
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">AdditionBolt</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">BaseRichBolt</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">IStatefulComponent</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Integer</span>&gt; </span>{
<span class="hljs-keyword">private</span> OutputCollector outputCollector;
<span class="hljs-keyword">private</span> State&lt;String, Integer&gt; count;
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">AdditionBolt</span><span class="hljs-params">()</span> </span>{
}
<span class="hljs-comment">// These two methods are required to implement the IStatefulComponent interface</span>
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">preSave</span><span class="hljs-params">(String checkpointId)</span> </span>{
System.out.println(String.format(<span class="hljs-string">"Saving spout state at checkpoint %s"</span>, checkpointId));
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">initState</span><span class="hljs-params">(State&lt;String, Integer&gt; state)</span> </span>{
count = state;
}
<span class="hljs-comment">// These three methods are required to extend the BaseRichSpout abstract class</span>
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">prepare</span><span class="hljs-params">(Map&lt;String, Object&gt;, TopologyContext ctx, OutputCollector collector)</span> </span>{
outputCollector = collector;
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">declareOutputFields</span><span class="hljs-params">(OutputFieldsDeclarer declarer)</span> </span>{
<span class="hljs-comment">// This bolt has no output fields, so none will be declared</span>
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">execute</span><span class="hljs-params">(Tuple tuple)</span> </span>{
<span class="hljs-comment">// Extract the incoming random integer from the arriving tuple</span>
<span class="hljs-keyword">int</span> incomingRandomInt = tuple.getInt(tuple.fieldIndex(<span class="hljs-string">"random-int"</span>));
<span class="hljs-comment">// Get the current sum from the count object, defaulting to zero in case</span>
<span class="hljs-comment">// this is the first processing operation.</span>
<span class="hljs-keyword">int</span> currentSum = count.getOrDefault(<span class="hljs-string">"count"</span>, <span class="hljs-number">0</span>);
<span class="hljs-keyword">int</span> newSum = incomingValue + currentSum;
<span class="hljs-comment">// Reset the sum to zero if it exceeds 1,000,000</span>
<span class="hljs-keyword">if</span> (newSum &gt; <span class="hljs-number">1000000</span>) {
newSum = <span class="hljs-number">0</span>;
}
<span class="hljs-comment">// Update the count state</span>
count.put(<span class="hljs-string">"count"</span>, newSum);
System.out.println(String.format(<span class="hljs-string">"The current saved sum is: %d"</span>, newSum));
}
}
</code></pre>
<p>A few things to notice in this bolt:</p>
<ul>
<li>As in the <code>RandomIntSpout</code>, all state is handled by the <code>count</code> variable, which is of type <code>State&lt;String, Integer&gt;</code>. In that state object, the key is always <code>count</code>, while the value is the current sum.</li>
<li>As in the <code>RandomIntSpout</code>, the <code>preSave</code> method simply logs the current checkpoint ID.</li>
<li>The bolt has no output (it simply logs the current stored sum), so no output fields need to be declared.</li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="putting-the-topology-together"></a><a href="#putting-the-topology-together" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Putting the topology together</h3>
<p>Now that we have a stateful spout and bolt in place, we can build and configure the topology:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.heron.api.Config;
<span class="hljs-keyword">import</span> org.apache.heron.api.HeronSubmitter;
<span class="hljs-keyword">import</span> org.apache.heron.api.exception.AlreadyAliveException;
<span class="hljs-keyword">import</span> org.apache.heron.api.exception.InvalidTopologyException;
<span class="hljs-keyword">import</span> org.apache.heron.api.topology.TopologyBuilder;
<span class="hljs-keyword">import</span> org.apache.heron.api.tuple.Fields;
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">EffectivelyOnceTopology</span> </span>{
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> <span class="hljs-keyword">throws</span> AlreadyAliveException, InvalidTopologyException </span>{
Config topologyConfig = <span class="hljs-keyword">new</span> Config();
<span class="hljs-comment">// Apply effectively-once semantics and set the checkpoint interval to 10 seconds</span>
topologyConfig.setTopologyReliabilityMode(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE);
topologyConfig.setTopologyStatefulCheckpointIntervalSecs(<span class="hljs-number">10</span>);
<span class="hljs-comment">// Build the topology out of the example spout and bolt</span>
TopologyBuilder topologyBuilder = <span class="hljs-keyword">new</span> TopologyBuilder();
topologyBuilder.setSpout(<span class="hljs-string">"random-int-spout"</span>, <span class="hljs-keyword">new</span> RandomIntSpout());
topologyBuilder.setBolt(<span class="hljs-string">"addition-bolt"</span>, <span class="hljs-keyword">new</span> AdditionBolt())
.fieldsGrouping(<span class="hljs-string">"random-int-spout"</span>, <span class="hljs-keyword">new</span> Fields(<span class="hljs-string">"random-int"</span>));
HeronSubmitter.submitTopology(args[<span class="hljs-number">0</span>], config, topologyBuilder.createTopology());
}
}
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="submitting-the-topology"></a><a href="#submitting-the-topology" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Submitting the topology</h3>
<p>The code for this topology can be found in <a href="https://github.com/streamlio/heron-java-effectively-once-example">this GitHub repository</a>. You can clone the repo locally like this:</p>
<pre><code class="hljs css language-bash">$ git <span class="hljs-built_in">clone</span> https://github.com/streamlio/heron-java-effectively-once-example
</code></pre>
<p>Once you have the repo locally, you can submit the topology to a <a href="getting-started-local-single-node">running Heron installation</a> like this (if you have <a href="https://maven.apache.org/">Maven</a> installed):</p>
<pre><code class="hljs css language-bash">$ <span class="hljs-built_in">cd</span> heron-java-effectively-once-example
$ mvn assembly:assembly
$ heron submit <span class="hljs-built_in">local</span> \
target/effectivelyonce-latest-jar-with-dependencies.jar \
io.streaml.example.effectivelyonce.RunningSumTopology \
RunningSumTopology
</code></pre>
<blockquote>
<p>By default, Heron uses the <a href="state-managers-local-fs">local filesystem</a> as a State Manager. If you're running Heron locally using the instructions in the <a href="getting-started-local-single-node">Quick Start Guide</a> then you won't need to change any settings to run this example stateful topology with effectively-once semantics.</p>
</blockquote>
<p>From there, you can see the log output for the bolt by running the <a href="user-manuals-heron-tracker-runbook">Heron Tracker</a> and <a href="user-manuals-heron-ui">Heron UI</a>:</p>
<pre><code class="hljs css language-bash">$ heron-tracker
<span class="hljs-comment"># In a different terminal window</span>
$ heron-ui
</code></pre>
<blockquote>
<p>For installation instructions for the Heron Tracker and the Heron UI, see the [Quick Start Guide](../../../getting-getting-started-local-single-node
Once the Heron UI is running, navigate to <a href="http://localhost:8889">http://localhost:8889</a> and click on the <code>RunningSumTopology</code> link. You should see something like this in the window that opens up:</p>
</blockquote>
<p><img src="/docs/assets/logical-topology.png" alt="Logical topology drilldown"></p>
<p>Click on <strong>addition-bolt</strong> on the right (under <strong>1 Container and 1 Instances</strong>) and then click on the blug <strong>logs</strong> button. You should see log output like this:</p>
<pre><code class="hljs css language-bash">[2017-10-06 13:39:07 -0700] [STDOUT] stdout: The current saved sum is: 0
[2017-10-06 13:39:07 -0700] [STDOUT] stdout: The current saved sum is: 68
[2017-10-06 13:39:07 -0700] [STDOUT] stdout: The current saved sum is: 93
[2017-10-06 13:39:07 -0700] [STDOUT] stdout: The current saved sum is: 117
[2017-10-06 13:39:07 -0700] [STDOUT] stdout: The current saved sum is: 123
[2017-10-06 13:39:07 -0700] [STDOUT] stdout: The current saved sum is: 185
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/0.20.1-incubating/client-api-docs-overview"><span class="arrow-prev"></span><span>Client API Docs</span></a><a class="docs-next button" href="/docs/0.20.1-incubating/guides-data-model"><span>Heron Data Model</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#specifying-delivery-semantics">Specifying delivery semantics</a></li><li><a href="#stateful-components">Stateful components</a></li><li><a href="#the-state-class">The <code>State</code> class</a></li><li><a href="#example-effectively-once-topology">Example effectively-once topology</a><ul class="toc-headings"><li><a href="#example-stateful-spout">Example stateful spout</a></li><li><a href="#example-stateful-bolt">Example stateful bolt</a></li><li><a href="#putting-the-topology-together">Putting the topology together</a></li><li><a href="#submitting-the-topology">Submitting the topology</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><div class="apache-disclaimer">Apache Heron is an effort undergoing incubation at <a target="_blank" href="https://apache.org/">The Apache Software Foundation (ASF)</a> sponsored by the Apache Incubator PMC. Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.<br/><br/>Apache®, the names of Apache projects, and the feather logo are either <a rel="external" href="https://www.apache.org/foundation/marks/list/">registered trademarks or trademarks</a> of the Apache Software Foundation in the United States and/or other countries.<br/><br/><div class="copyright-box">Copyright © 2023 the Apache Software Foundation, Apache Heron, Heron,
Apache, the Apache feather Logo, and the Apache Heron project logo are either registered
trademarks or trademarks of the Apache Software Foundation.</div></div><div class="apache-links"><a class="item" rel="external" href="https://incubator.apache.org/">Apache Incubator</a><div><a class="item" rel="external" href="https://www.apache.org/">About the ASF</a></div><div><a class="item" rel="external" href="https://www.apache.org/events/current-event">Events</a></div><div><a class="item" rel="external" href="https://www.apache.org/foundation/thanks.html">Thanks</a></div><div><a class="item" rel="external" href="https://www.apache.org/foundation/sponsorship.html">Become a Sponsor</a></div><div><a class="item" rel="external" href="https://www.apache.org/security/">Security</a></div><div><a class="item" rel="external" href="https://www.apache.org/licenses/">License</a></div></div></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>