blob: 9a51dee4408605c46beb758587d7032b7df35df1 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>The Heron Streamlet API for Java · Apache Heron</title><meta name="viewport" content="width=device-width"/><meta name="generator" content="Docusaurus"/><meta name="description" content="&lt;!--"/><meta name="docsearch:version" content="0.20.4-incubating"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="The Heron Streamlet API for Java · Apache Heron"/><meta property="og:type" content="website"/><meta property="og:url" content="https://heron.apache.org/"/><meta property="og:description" content="&lt;!--"/><meta property="og:image" content="https://heron.apache.org/img/undraw_online.svg"/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://heron.apache.org/img/undraw_tweetstorm.svg"/><link rel="shortcut icon" href="/img/favicon-32x32.png"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/default.min.css"/><link rel="alternate" type="application/atom+xml" href="https://heron.apache.org/blog/atom.xml" title="Apache Heron Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://heron.apache.org/blog/feed.xml" title="Apache Heron Blog RSS Feed"/><script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','https://www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-198017384-1', 'auto');
ga('send', 'pageview');
</script><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="/js/custom.js"></script><script type="text/javascript" src="/js/fix-location.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/"><img class="logo" src="/img/HeronTextLogo-small.png" alt="Apache Heron"/><h2 class="headerTitleWithLogo">Apache Heron</h2></a><a href="/versions"><h3>0.20.4-incubating</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class=""><a href="/api/java" target="_self">Javadocs</a></li><li class=""><a href="/api/python" target="_self">Pydocs</a></li><li class="siteNavGroupActive"><a href="/docs/0.20.4-incubating/getting-started-local-single-node" target="_self">Docs</a></li><li class=""><a href="/download" target="_self">Downloads</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#apache" target="_self">Apache</a></li></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="container docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Topology Development APIs</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Getting Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/getting-started-local-single-node">Local (Single Node)</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/getting-started-migrate-storm-topologies">Migrate Storm Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/getting-started-docker">Heron &amp; Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/getting-started-troubleshooting-guide">Troubleshooting Guide</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/deployment-overview">Deployment Overiew</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/deployment-configuration">Configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/deployment-api-server">The Heron API Server</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Topology Development APIs</h3><ul class=""><li class="navListItem navListItemActive"><a class="navItem" href="/docs/0.20.4-incubating/topology-development-streamlet-api">The Heron Streamlet API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/topology-development-eco-api">The ECO API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/topology-development-topology-api-java">The Heron Topology API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/topology-development-topology-api-python">The Heron Topology API for Python</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/topology-development-streamlet-scala">The Heron Streamlet API for Scala</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client API Docs</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/client-api-docs-overview">Client API Docs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Guides</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-effectively-once-java-topologies">Effectively Once Java Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-data-model">Heron Data Model</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-tuple-serialization">Tuple Serialization</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-ui-guide">Heron UI Guide</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-topology-tuning">Topology Tuning Guide</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-packing-algorithms">Packing Algorithms</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-simulator-mode">Simulator Mode</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-troubeshooting-guide">Topology Troubleshooting Guide</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Heron Concepts</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/heron-design-goals">Heron Design Goals</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/heron-topology-concepts">Heron Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/heron-streamlet-concepts">Heron Streamlets</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/heron-architecture">Heron Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/heron-delivery-semantics">Heron Delivery Semantics</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">State Managers</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/state-managers-zookeeper">Zookeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/state-managers-local-fs">Local File System</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Uploaders</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/uploaders-local-fs">Local File System</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/uploaders-hdfs">HDFS</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/uploaders-http">HTTP</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/uploaders-amazon-s3">Amazon S3</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/uploaders-scp">Secure Copy (SCP)</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Schedulers</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-k8s-by-hand">Kubernetes by hand</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-k8s-with-helm">Kubernetes with Helm</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-aurora-cluster">Aurora Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-aurora-local">Aurora Locally</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-local">Local Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-nomad">Nomad</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-mesos-local-mac">Mesos Cluster Locally</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-slurm">Slurm Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-yarn">YARN Cluster</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cluster Configuration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/cluster-config-overview">Cluster Config Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/cluster-config-system-level">System Level Configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/cluster-config-instance">Heron Instance</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/cluster-config-metrics">Metrics Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/cluster-config-stream">Stream Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/cluster-config-tmanager">Topology Manager</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Observability</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/observability-prometheus">Prometheus</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/observability-graphite">Graphite</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/observability-scribe">Scribe</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">User Manuals</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/user-manuals-heron-cli">Heron Client</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/user-manuals-heron-explorer">Heron Explorer</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/user-manuals-tracker-rest">Heron Tracker REST API</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/user-manuals-heron-tracker-runbook">Heron Tracker Runbook</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/user-manuals-heron-ui-runbook">Heron UI Runbook</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/user-manuals-heron-shell">Heron Shell</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Compiling</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/compiling-overview">Compiling Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/compiling-linux">Compiling on Linux</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/compiling-osx">Compiling on OS X</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/compiling-docker">Compiling With Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/compiling-running-tests">Running Tests</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/compiling-code-organization">Code Organization</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Extending Heron</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/extending-heron-scheduler">Custom Scheduler</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/extending-heron-metric-sink">Custom Metrics Sink</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Heron Resources</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/heron-resources-resources">Heron Resources</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer"><div class="wrapper"><div class="post"><header class="postHeader"><h1 class="postHeaderTitle">The Heron Streamlet API for Java</h1></header><article><div><span><!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<blockquote>
<p><strong>The Heron Streamlet API is in beta.</strong>
The Heron Streamlet API is well tested and can be used to build and test topologies locally. The API is not yet fully stable, however, and breaking changes are likely in the coming weeks.</p>
</blockquote>
<p>Heron processing topologies can be written using an API called the <strong>Heron Streamlet API</strong>. The Heron Streamlet API is currently available for the following languages:</p>
<ul>
<li><a href="topology-development-streamlet-api">Java</a></li>
<li><a href="topology-development-streamlet-scala">Scala</a></li>
</ul>
<blockquote>
<p>Although this document covers the new Heron Streamlet API, topologies created using the original <a href="topology-development-topology-api-java">topology API</a> can still be used with Heron (which means that all of your older topologies will still run).</p>
</blockquote>
<p>For a more in-depth conceptual guide to the new API, see <a href="topology-development-streamlet-api">The Heron Streamlet API</a>. A high-level overview can also be found in the section immediately <a href="#the-heron-streamlet-api-vs-the-topology-api">below</a>.</p>
<h2><a class="anchor" aria-hidden="true" id="the-heron-streamlet-api-vs-the-topology-api"></a><a href="#the-heron-streamlet-api-vs-the-topology-api" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>The Heron Streamlet API vs. The Topology API</h2>
<p>When Heron was first released, all Heron topologies needed to be written using an API based on the <a href="topology-development-topology-api-java">Storm Topology API</a>. Although this API is quite powerful (and can still be used), the <strong>Heron Streamlet API</strong> enables you to create topologies without needing to implement spouts and bolts directly or to connect spouts and bolts together.</p>
<p>Here are some crucial differences between the two APIs:</p>
<table>
<thead>
<tr><th style="text-align:left">Domain</th><th style="text-align:left">Original Topology API</th><th style="text-align:left">Heron Streamlet API</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">Programming style</td><td style="text-align:left">Procedural, processing component based</td><td style="text-align:left">Functional</td></tr>
<tr><td style="text-align:left">Abstraction level</td><td style="text-align:left"><strong>Low level</strong>. Developers must think in terms of &quot;physical&quot; spout and bolt implementation logic.</td><td style="text-align:left"><strong>High level</strong>. Developers can write processing logic in an idiomatic fashion in the language of their choice, without needing to write and connect spouts and bolts.</td></tr>
<tr><td style="text-align:left">Processing model</td><td style="text-align:left"><a href="heron-topology-concepts#spouts">Spout</a> and <a href="heron-topology-concepts#bolts">bolt</a> logic must be created explicitly, and connecting spouts and bolts is the responsibility of the developer</td><td style="text-align:left">Spouts and bolts are created for you automatically on the basis of the processing graph that you build</td></tr>
</tbody>
</table>
<p>The two APIs also have a few things in common:</p>
<ul>
<li>Topologies' <a href="heron-topology-concepts#logical-plan">logical</a> and <a href="heron-topology-concepts#physical-plan">physical</a> plans are automatically created by Heron</li>
<li>Topologies are <a href="user-manuals-heron-cli">managed</a> in the same way using the <code>heron</code> CLI tool</li>
</ul>
<h2><a class="anchor" aria-hidden="true" id="getting-started"></a><a href="#getting-started" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Getting started</h2>
<p>In order to use the Heron Streamlet API for Java, you'll need to install the <code>heron-api</code> library.</p>
<h3><a class="anchor" aria-hidden="true" id="maven-setup"></a><a href="#maven-setup" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Maven setup</h3>
<p>In order to use the <code>heron-api</code> library, add this to the <code>dependencies</code> block of your <code>pom.xml</code> configuration file:</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.heron<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>heron-api<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>{{<span class="hljs-tag">&lt; <span class="hljs-attr">heronVersion</span> &gt;</span>}}<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="compiling-a-jar-with-dependencies"></a><a href="#compiling-a-jar-with-dependencies" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Compiling a JAR with dependencies</h4>
<p>In order to run a Java topology created using the Heron Streamlet API in a Heron cluster, you'll need to package your topology as a &quot;fat&quot; JAR with dependencies included. You can use the <a href="https://maven.apache.org/plugins/maven-assembly-plugin/usage.html">Maven Assembly Plugin</a> to generate JARs with dependencies. To install the plugin and add a Maven goal for a single JAR, add this to the <code>plugins</code> block in your <code>pom.xml</code>:</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">plugin</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>maven-assembly-plugin<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">configuration</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">descriptorRefs</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">descriptorRef</span>&gt;</span>jar-with-dependencies<span class="hljs-tag">&lt;/<span class="hljs-name">descriptorRef</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">descriptorRefs</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">archive</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">manifest</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">mainClass</span>&gt;</span><span class="hljs-tag">&lt;/<span class="hljs-name">mainClass</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">manifest</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">archive</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">configuration</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">executions</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">execution</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">id</span>&gt;</span>make-assembly<span class="hljs-tag">&lt;/<span class="hljs-name">id</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">phase</span>&gt;</span>package<span class="hljs-tag">&lt;/<span class="hljs-name">phase</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">goals</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">goal</span>&gt;</span>single<span class="hljs-tag">&lt;/<span class="hljs-name">goal</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">goals</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">execution</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">executions</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">plugin</span>&gt;</span>
</code></pre>
<p>Once your <code>pom.xml</code> is properly set up, you can compile the JAR with dependencies using this command:</p>
<pre><code class="hljs css language-bash">$ mvn assembly:assembly
</code></pre>
<p>By default, this will add a JAR in your project's <code>target</code> folder with the name <code>PROJECT-NAME-VERSION-jar-with-dependencies.jar</code>. Here's an example topology submission command using a compiled JAR:</p>
<pre><code class="hljs css language-bash">$ mvn assembly:assembly
$ heron submit <span class="hljs-built_in">local</span> \
target/my-project-1.2.3-jar-with-dependencies.jar \
com.example.Main \
MyTopology arg1 arg2
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="java-streamlet-api-starter-project"></a><a href="#java-streamlet-api-starter-project" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java Streamlet API starter project</h3>
<p>If you'd like to up and running quickly with the Heron Streamlet API for Java, you can view the example topologies [here](<a href="https://github.com/apache/incubator-heron/tree/">https://github.com/apache/incubator-heron/tree/</a>{{ heron:version }}/examples/src/java/org/apache/heron/examples/streamlet)</p>
<p>If you're running a <a href="getting-started-local-single-node">local Heron cluster</a>, you can submit the built example topology like this:</p>
<pre><code class="hljs css language-bash">$ heron submit <span class="hljs-built_in">local</span> \
~/.heron/examples/heron-streamlet-examples.jar \
org.apache.heron.examples.streamlet.WindowedWordCountTopology \
streamletWindowedWordCount
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="selecting-delivery-semantics"></a><a href="#selecting-delivery-semantics" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Selecting delivery semantics</h4>
<p>Heron enables you to apply one of three <a href="heron-delivery-semantics">delivery semantics</a> to any Heron topology. For the example topology above, you can select the delivery semantics when you submit the topology with the topology's second argument. This command, for example, would apply <a href="heron-delivery-semantics">effectively-once</a> to the example topology:</p>
<pre><code class="hljs css language-bash">$ heron submit <span class="hljs-built_in">local</span> \
~/.heron/examples/heron-streamlet-examples.jar \
org.apache.heron.examples.streamlet.WireRequestsTopology \
wireRequestsTopology
</code></pre>
<p>The other options are <code>at-most-once</code> and <code>at-least-once</code>. If you don't explicitly select the delivery semantics, at-least-once semantics will be applied.</p>
<h2><a class="anchor" aria-hidden="true" id="streamlet-api-topology-configuration"></a><a href="#streamlet-api-topology-configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Streamlet API topology configuration</h2>
<p>Every Streamlet API topology needs to be configured using a <code>Config</code> object. Here's an example default configuration:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.Config;
<span class="hljs-keyword">import</span> org.apache.heron.streamlet.Runner;
Config topologyConfig = Config.defaultConfig();
<span class="hljs-comment">// Apply topology configuration using the topologyConfig object</span>
Runner topologyRunner = <span class="hljs-keyword">new</span> Runner();
topologyRunner.run(<span class="hljs-string">"name-for-topology"</span>, topologyConfig, topologyBuilder);
</code></pre>
<p>The table below shows the configurable parameters for Heron topologies:</p>
<table>
<thead>
<tr><th style="text-align:left">Parameter</th><th style="text-align:left">Default</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="#delivery-semantics">Delivery semantics</a></td><td style="text-align:left">At most once</td></tr>
<tr><td style="text-align:left">Serializer</td><td style="text-align:left"><a href="https://github.com/EsotericSoftware/kryo">Kryo</a></td></tr>
<tr><td style="text-align:left">Number of total container topologies</td><td style="text-align:left">2</td></tr>
<tr><td style="text-align:left">Per-container CPU</td><td style="text-align:left">1.0</td></tr>
<tr><td style="text-align:left">Per-container RAM</td><td style="text-align:left">100 MB</td></tr>
</tbody>
</table>
<p>Here's an example non-default configuration:</p>
<pre><code class="hljs css language-java">Config topologyConfig = Config.newBuilder()
.setNumContainers(<span class="hljs-number">5</span>)
.setPerContainerRamInGigabytes(<span class="hljs-number">10</span>)
.setPerContainerCpu(<span class="hljs-number">3.5f</span>)
.setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE)
.setSerializer(Config.Serializer.JAVA)
.setUserConfig(<span class="hljs-string">"some-key"</span>, <span class="hljs-string">"some-value"</span>)
.build();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="delivery-semantics"></a><a href="#delivery-semantics" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Delivery semantics</h3>
<p>You can apply <a href="heron-delivery-semantics">delivery semantics</a> to a Streamlet API topology like this:</p>
<pre><code class="hljs css language-java">topologyConfig
.setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE);
</code></pre>
<p>The other available options in the <code>DeliverySemantics</code> enum are <code>ATMOST_ONCE</code> and <code>ATLEAST_ONCE</code>.</p>
<h2><a class="anchor" aria-hidden="true" id="streamlets"></a><a href="#streamlets" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Streamlets</h2>
<p>In the Heron Streamlet API for Java, processing graphs consist of streamlets. One or more supplier streamlets inject data into your graph to be processed by downstream operators.</p>
<h2><a class="anchor" aria-hidden="true" id="operations"></a><a href="#operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Operations</h2>
<table>
<thead>
<tr><th style="text-align:left">Operation</th><th style="text-align:left">Description</th><th style="text-align:left">Example</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="#map-operations"><code>map</code></a></td><td style="text-align:left">Create a new streamlet by applying the supplied mapping function to each element in the original streamlet</td><td style="text-align:left">Add 1 to each element in a streamlet of integers</td></tr>
<tr><td style="text-align:left"><a href="#flatmap-operations"><code>flatMap</code></a></td><td style="text-align:left">Like a map operation but with the important difference that each element of the streamlet is flattened</td><td style="text-align:left">Flatten a sentence into individual words</td></tr>
<tr><td style="text-align:left"><a href="#filter-operations"><code>filter</code></a></td><td style="text-align:left">Create a new streamlet containing only the elements that satisfy the supplied filtering function</td><td style="text-align:left">Remove all inappropriate words from a streamlet of strings</td></tr>
<tr><td style="text-align:left"><a href="#union-operations"><code>union</code></a></td><td style="text-align:left">Unifies two streamlets into one, without modifying the elements of the two streamlets</td><td style="text-align:left">Unite two different <code>Streamlet&lt;String&gt;</code>s into a single streamlet</td></tr>
<tr><td style="text-align:left"><a href="#clone-operations"><code>clone</code></a></td><td style="text-align:left">Creates any number of identical copies of a streamlet</td><td style="text-align:left">Create three separate streamlets from the same source</td></tr>
<tr><td style="text-align:left"><a href="#transform-operations"><code>transform</code></a></td><td style="text-align:left">Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations)</td></tr>
<tr><td style="text-align:left"><a href="#join-operations"><code>join</code></a></td><td style="text-align:left">Create a new streamlet by combining two separate key-value streamlets into one on the basis of each element's key. Supported Join Types: Inner (as default), Outer-Left, Outer-Right and Outer.</td><td style="text-align:left">Combine key-value pairs listing current scores (e.g. <code>(&quot;h4x0r&quot;, 127)</code>) for each user into a single per-user stream</td></tr>
<tr><td style="text-align:left"><a href="#key-by-operations"><code>keyBy</code></a></td><td style="text-align:left">Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet</td></tr>
<tr><td style="text-align:left"><a href="#reduce-by-key-operations"><code>reduceByKey</code></a></td><td style="text-align:left">Produces a streamlet of key-value on each key, and in accordance with a reduce function that you apply to all the accumulated values</td><td style="text-align:left">Count the number of times a value has been encountered</td></tr>
<tr><td style="text-align:left"><a href="#reduce-by-key-and-window-operations"><code>reduceByKeyAndWindow</code></a></td><td style="text-align:left">Produces a streamlet of key-value on each key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values</td><td style="text-align:left">Count the number of times a value has been encountered within a specified time window</td></tr>
<tr><td style="text-align:left"><a href="#count-by-key-operations"><code>countByKey</code></a></td><td style="text-align:left">A special reduce operation of counting number of tuples on each key</td><td style="text-align:left">Count the number of times a value has been encountered</td></tr>
<tr><td style="text-align:left"><a href="#count-by-key-and-window-operations"><code>countByKeyAndWindow</code></a></td><td style="text-align:left">A special reduce operation of counting number of tuples on each key, within a time window</td><td style="text-align:left">Count the number of times a value has been encountered within a specified time window</td></tr>
<tr><td style="text-align:left"><a href="#split-operations"><code>split</code></a></td><td style="text-align:left">Split a streamlet into multiple streamlets with different id</td></tr>
<tr><td style="text-align:left"><a href="#with-stream-operations"><code>withStream</code></a></td><td style="text-align:left">Select a stream with id from a streamlet that contains multiple streams</td></tr>
<tr><td style="text-align:left"><a href="#apply-operator-operations"><code>applyOperator</code></a></td><td style="text-align:left">Returns a new streamlet by applying an user defined operator to the original streamlet</td><td style="text-align:left">Apply an existing bolt as an operator</td></tr>
<tr><td style="text-align:left"><a href="#repartition-operations"><code>repartition</code></a></td><td style="text-align:left">Create a new streamlet by applying a new parallelism level to the original streamlet</td><td style="text-align:left">Increase the parallelism of a streamlet from 5 to 10</td></tr>
<tr><td style="text-align:left"><a href="#sink-operations"><code>toSink</code></a></td><td style="text-align:left">Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc.</td><td style="text-align:left">Store processing graph results in an AWS Redshift table</td></tr>
<tr><td style="text-align:left"><a href="#log-operations"><code>log</code></a></td><td style="text-align:left">Logs the final results of a processing graph to stdout. This <em>must</em> be the last step in the graph.</td></tr>
<tr><td style="text-align:left"><a href="#consume-operations"><code>consume</code></a></td><td style="text-align:left">Consume operations are like sink operations except they don't require implementing a full sink interface (consume operations are thus suited for simple operations like logging)</td><td style="text-align:left">Log processing graph results using a custom formatting function</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="map-operations"></a><a href="#map-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Map operations</h3>
<p>Map operations create a new streamlet by applying the supplied mapping function to each element in the original streamlet. Here's an example:</p>
<pre><code class="hljs css language-java">builder.newSource(() -&gt; <span class="hljs-number">1</span>)
.map(i -&gt; i + <span class="hljs-number">12</span>);
</code></pre>
<p>In this example, a supplier streamlet emits an indefinite series of 1s. The <code>map</code> operation then adds 12 to each incoming element, producing a streamlet of 13s.</p>
<h3><a class="anchor" aria-hidden="true" id="flatmap-operations"></a><a href="#flatmap-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>FlatMap operations</h3>
<p>FlatMap operations are like <code>map</code> operations but with the important difference that each element of the streamlet is &quot;flattened&quot; into a collection type. In this example, a supplier streamlet emits the same sentence over and over again; the <code>flatMap</code> operation transforms each sentence into a Java <code>List</code> of individual words:</p>
<pre><code class="hljs css language-java">builder.newSource(() -&gt; <span class="hljs-string">"I have nothing to declare but my genius"</span>)
.flatMap((sentence) -&gt; Arrays.asList(sentence.split(<span class="hljs-string">"\\s+"</span>)));
</code></pre>
<p>The effect of this operation is to transform the <code>Streamlet&lt;String&gt;</code> into a <code>Streamlet&lt;List&lt;String&gt;&gt;</code>.</p>
<blockquote>
<p>One of the core differences between <code>map</code> and <code>flatMap</code> operations is that <code>flatMap</code> operations typically transform non-collection types into collection types.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="filter-operations"></a><a href="#filter-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Filter operations</h3>
<p>Filter operations retain elements in a streamlet, while potentially excluding some or all elements, on the basis of a provided filtering function. Here's an example:</p>
<pre><code class="hljs css language-java">builder.newSource(() -&gt; ThreadLocalRandom.current().nextInt(<span class="hljs-number">1</span>, <span class="hljs-number">11</span>))
.filter((i) -&gt; i &lt; <span class="hljs-number">7</span>);
</code></pre>
<p>In this example, a source streamlet consisting of random integers between 1 and 10 is modified by a <code>filter</code> operation that removes all streamlet elements that are greater than 6.</p>
<h3><a class="anchor" aria-hidden="true" id="union-operations"></a><a href="#union-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Union operations</h3>
<p>Union operations combine two streamlets of the same type into a single streamlet without modifying the elements. Here's an example:</p>
<pre><code class="hljs css language-java">Streamlet&lt;String&gt; flowers = builder.newSource(() -&gt; <span class="hljs-string">"flower"</span>);
Streamlet&lt;String&gt; butterflies = builder.newSource(() -&gt; <span class="hljs-string">"butterfly"</span>);
Streamlet&lt;String&gt; combinedSpringStreamlet = flowers
.union(butterflies);
</code></pre>
<p>Here, one streamlet is an endless series of &quot;flowers&quot; while the other is an endless series of &quot;butterflies&quot;. The <code>union</code> operation combines them into a single <code>Spring</code> streamlet of alternating &quot;flowers&quot; and &quot;butterflies&quot;.</p>
<h3><a class="anchor" aria-hidden="true" id="clone-operations"></a><a href="#clone-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Clone operations</h3>
<p>Clone operations enable you to create any number of &quot;copies&quot; of a streamlet. Each of the &quot;copy&quot; streamlets contains all the elements of the original and can be manipulated just like the original streamlet. Here's an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.List;
<span class="hljs-keyword">import</span> java.util.concurrent.ThreadLocalRandom;
Streamlet&lt;Integer&gt; integers = builder.newSource(() -&gt; ThreadLocalRandom.current().nextInt(<span class="hljs-number">100</span>));
List&lt;Streamlet&lt;Integer&gt;&gt; copies = integers.clone(<span class="hljs-number">5</span>);
Streamlet&lt;Integer&gt; ints1 = copies.get(<span class="hljs-number">0</span>);
Streamlet&lt;Integer&gt; ints2 = copies.get(<span class="hljs-number">1</span>);
Streamlet&lt;Integer&gt; ints3 = copies.get(<span class="hljs-number">2</span>);
<span class="hljs-comment">// and so on...</span>
</code></pre>
<p>In this example, a streamlet of random integers between 1 and 100 is split into 5 identical streamlets.</p>
<h3><a class="anchor" aria-hidden="true" id="transform-operations"></a><a href="#transform-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Transform operations</h3>
<p>Transform operations are highly flexible operations that are most useful for:</p>
<ul>
<li>operations involving state in <a href="heron-delivery-semantics#stateful-topologies">stateful topologies</a></li>
<li>operations that don't neatly fit into the other categories or into a lambda-based logic</li>
</ul>
<p>Transform operations require you to implement three different methods:</p>
<ul>
<li>A <code>setup</code> method that enables you to pass a context object to the operation and to specify what happens prior to the <code>transform</code> step</li>
<li>A <code>transform</code> operation that performs the desired transformation</li>
<li>A <code>cleanup</code> method that allows you to specify what happens after the <code>transform</code> step</li>
</ul>
<p>The context object available to a transform operation provides access to:</p>
<ul>
<li>the current state of the topology</li>
<li>the topology's configuration</li>
<li>the name of the stream</li>
<li>the stream partition</li>
<li>the current task ID</li>
</ul>
<p>Here's a Java example of a transform operation in a topology where a stateful record is kept of the number of items processed:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.Context;
<span class="hljs-keyword">import</span> org.apache.heron.streamlet.SerializableTransformer;
<span class="hljs-keyword">import</span> java.util.function.Consumer;
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">CountNumberOfItems</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">SerializableTransformer</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">String</span>&gt; </span>{
<span class="hljs-keyword">private</span> <span class="hljs-keyword">int</span> numberOfItems;
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">setup</span><span class="hljs-params">(Context context)</span> </span>{
numberOfItems = (<span class="hljs-keyword">int</span>) context.getState().get(<span class="hljs-string">"number-of-items"</span>);
context.getState().put(<span class="hljs-string">"number-of-items"</span>, numberOfItems + <span class="hljs-number">1</span>);
}
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">transform</span><span class="hljs-params">(String in, Consumer&lt;String&gt; consumer)</span> </span>{
String transformedString = <span class="hljs-comment">// Apply some operation to the incoming value</span>
consumer.accept(transformedString);
}
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">cleanup</span><span class="hljs-params">()</span> </span>{
System.out.println(
String.format(<span class="hljs-string">"Successfully processed new state: %d"</span>, numberOfItems));
}
}
</code></pre>
<p>This operation does a few things:</p>
<ul>
<li>In the <code>setup</code> method, the <a href="/api/java/org/apache/heron/streamlet/Context.html"><code>Context</code></a> object is used to access the current state (which has the semantics of a Java <code>Map</code>). The current number of items processed is incremented by one and then saved as the new state.</li>
<li>In the <code>transform</code> method, the incoming string is transformed in some way and then &quot;accepted&quot; as the new value.</li>
<li>In the <code>cleanup</code> step, the current count of items processed is logged.</li>
</ul>
<p>Here's that operation within the context of a streamlet processing graph:</p>
<pre><code class="hljs css language-java">builder.newSource(() -&gt; <span class="hljs-string">"Some string over and over"</span>);
.transform(<span class="hljs-keyword">new</span> CountNumberOfItems())
.log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="join-operations"></a><a href="#join-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Join operations</h3>
<p>Join operations unify two streamlets <em>on a key</em> (join operations thus require KV streamlets). Each <code>KeyValue</code> object in a streamlet has, by definition, a key. When a join operation is added to a processing graph,</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.WindowConfig;
Builder builder = Builder.newBuilder();
KVStreamlet&lt;String, String&gt; streamlet1 =
builder.newKVSource(() -&gt; <span class="hljs-keyword">new</span> KeyValue&lt;&gt;(<span class="hljs-string">"heron-api"</span>, <span class="hljs-string">"topology-api"</span>));
builder.newSource(() -&gt; <span class="hljs-keyword">new</span> KeyValue&lt;&gt;(<span class="hljs-string">"heron-api"</span>, <span class="hljs-string">"streamlet-api"</span>))
.join(streamlet1, WindowConfig.TumblingCountWindow(<span class="hljs-number">10</span>), KeyValue::create);
</code></pre>
<p>In this case, the resulting streamlet would consist of an indefinite stream with two <code>KeyValue</code> objects with the key <code>heron-api</code> but different values (<code>topology-api</code> and <code>streamlet-api</code>).</p>
<blockquote>
<p>The effect of a join operation is to create a new streamlet <em>for each key</em>.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="key-by-operations"></a><a href="#key-by-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Key by operations</h3>
<p>Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet. Here is an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;
Builder builder = Builder.newBuilder()
.newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
<span class="hljs-comment">// Convert each sentence into individual words</span>
.flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
.keyBy(
<span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span>
word -&gt; word,
<span class="hljs-comment">// Value extractor (get the length of each word)</span>
word -&gt; workd.length()
)
<span class="hljs-comment">// The result is logged</span>
.log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="reduce-by-key-operations"></a><a href="#reduce-by-key-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reduce by key operations</h3>
<p>You can apply <a href="https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html">reduce</a> operations to streamlets by specifying:</p>
<ul>
<li>a key extractor that determines what counts as the key for the streamlet</li>
<li>a value extractor that determines which final value is chosen for each element of the streamlet</li>
<li>a reduce function that produces a single value for each key in the streamlet</li>
</ul>
<p>Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value). Here's an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;
Builder builder = Builder.newBuilder()
.newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
<span class="hljs-comment">// Convert each sentence into individual words</span>
.flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
.reduceByKeyAndWindow(
<span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span>
word -&gt; word,
<span class="hljs-comment">// Value extractor (each word appears only once, hence the value is always 1)</span>
word -&gt; <span class="hljs-number">1</span>,
<span class="hljs-comment">// Reduce operation (a running sum)</span>
(x, y) -&gt; x + y
)
<span class="hljs-comment">// The result is logged</span>
.log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="reduce-by-key-and-window-operations"></a><a href="#reduce-by-key-and-window-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reduce by key and window operations</h3>
<p>You can apply <a href="https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html">reduce</a> operations to streamlets by specifying:</p>
<ul>
<li>a key extractor that determines what counts as the key for the streamlet</li>
<li>a value extractor that determines which final value is chosen for each element of the streamlet</li>
<li>a <a href="../../../concepts/topologies#window-operations">time window</a> across which the operation will take place</li>
<li>a reduce function that produces a single value for each key in the streamlet</li>
</ul>
<p>Reduce by key and window operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value, as well as information about the window in which the operation took place). Here's an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;
<span class="hljs-keyword">import</span> org.apache.heron.streamlet.WindowConfig;
Builder builder = Builder.newBuilder()
.newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
<span class="hljs-comment">// Convert each sentence into individual words</span>
.flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
.reduceByKeyAndWindow(
<span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span>
word -&gt; word,
<span class="hljs-comment">// Value extractor (each word appears only once, hence the value is always 1)</span>
word -&gt; <span class="hljs-number">1</span>,
<span class="hljs-comment">// Window configuration</span>
WindowConfig.TumblingCountWindow(<span class="hljs-number">50</span>),
<span class="hljs-comment">// Reduce operation (a running sum)</span>
(x, y) -&gt; x + y
)
<span class="hljs-comment">// The result is logged</span>
.log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="count-by-key-operations"></a><a href="#count-by-key-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Count by key operations</h3>
<p>Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered. Here's an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;
Builder builder = Builder.newBuilder()
.newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
<span class="hljs-comment">// Convert each sentence into individual words</span>
.flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
.countByKeyAndWindow(word -&gt; word)
<span class="hljs-comment">// The result is logged</span>
.log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="count-by-key-and-window-operations"></a><a href="#count-by-key-and-window-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Count by key and window operations</h3>
<p>Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each <a href="../../../concepts/topologies#window-operations">time window</a>. Here's an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;
<span class="hljs-keyword">import</span> org.apache.heron.streamlet.WindowConfig;
Builder builder = Builder.newBuilder()
.newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
<span class="hljs-comment">// Convert each sentence into individual words</span>
.flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
.countByKeyAndWindow(
<span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span>
word -&gt; word,
<span class="hljs-comment">// Window configuration</span>
WindowConfig.TumblingCountWindow(<span class="hljs-number">50</span>),
)
<span class="hljs-comment">// The result is logged</span>
.log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="split-operations"></a><a href="#split-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Split operations</h3>
<p>Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet. Here is an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;
Map&lt;String, SerializablePredicate&lt;String&gt;&gt; splitter = <span class="hljs-keyword">new</span> HashMap();
splitter.put(<span class="hljs-string">"long_word"</span>, s -&gt; s.length() &gt;= <span class="hljs-number">4</span>);
splitter.put(<span class="hljs-string">"short_word"</span>, s -&gt; s.length() &lt; <span class="hljs-number">4</span>);
Builder builder = Builder.newBuilder()
.newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
<span class="hljs-comment">// Convert each sentence into individual words</span>
.flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
<span class="hljs-comment">// Splits the stream into streams of long and short words</span>
.split(splitter)
<span class="hljs-comment">// Choose the stream of the short words</span>
.withStream(<span class="hljs-string">"short_word"</span>)
<span class="hljs-comment">// The result is logged</span>
.log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="with-stream-operations"></a><a href="#with-stream-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>With stream operations</h3>
<p>With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with <a href="#split-operations">split</a>.</p>
<h3><a class="anchor" aria-hidden="true" id="apply-operator-operations"></a><a href="#apply-operator-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Apply operator operations</h3>
<p>Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet. Here is an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;
<span class="hljs-keyword">private</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">MyBoltOperator</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">MyBolt</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">IStreamletRichOperator</span>&lt;<span class="hljs-title">Double</span>, <span class="hljs-title">Double</span>&gt; </span>{
}
Builder builder = Builder.newBuilder()
.newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
<span class="hljs-comment">// Convert each sentence into individual words</span>
.flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
<span class="hljs-comment">// Apply user defined operation</span>
.applyOperator(<span class="hljs-keyword">new</span> MyBoltOperator())
<span class="hljs-comment">// The result is logged</span>
.log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="repartition-operations"></a><a href="#repartition-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Repartition operations</h3>
<p>When you assign a number of <a href="#partitioning-and-parallelism">partitions</a> to a processing step, each step that comes after it inherits that number of partitions. Thus, if you assign 5 partitions to a <code>map</code> operation, then any <code>mapToKV</code>, <code>flatMap</code>, <code>filter</code>, etc. operations that come after it will also be assigned 5 partitions. But you can also change the number of partitions for a processing step (as well as the number of partitions for downstream operations) using <code>repartition</code>. Here's an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.concurrent.ThreadLocalRandom;
Builder builder = Builder.newBuilder();
builder.newSource(() -&gt; ThreadLocalRandom.current().nextInt(<span class="hljs-number">1</span>, <span class="hljs-number">11</span>))
.setNumPartitions(<span class="hljs-number">5</span>)
.map(i -&gt; i + <span class="hljs-number">1</span>)
.repartition(<span class="hljs-number">2</span>)
.filter(i -&gt; i &gt; <span class="hljs-number">7</span> &amp;&amp; i &lt; <span class="hljs-number">2</span>)
.log();
</code></pre>
<p>In this example, the supplier streamlet emits random integers between one and ten. That operation is assigned 5 partitions. After the <code>map</code> operation, the <code>repartition</code> function is used to assign 2 partitions to all downstream operations.</p>
<h3><a class="anchor" aria-hidden="true" id="sink-operations"></a><a href="#sink-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Sink operations</h3>
<p>In processing graphs like the ones you build using the Heron Streamlet API, <strong>sinks</strong> are essentially the terminal points in your graph, where your processing logic comes to an end. A processing graph can end with writing to a database, publishing to a topic in a pub-sub messaging system, and so on. With the Streamlet API, you can implement your own custom sinks. Here's an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.Context;
<span class="hljs-keyword">import</span> org.apache.heron.streamlet.Sink;
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">FormattedLogSink</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Sink</span>&lt;<span class="hljs-title">T</span>&gt; </span>{
<span class="hljs-keyword">private</span> String streamletName;
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">setup</span><span class="hljs-params">(Context context)</span> </span>{
streamletName = context.getStreamName();
}
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">put</span><span class="hljs-params">(T element)</span> </span>{
String message = String.format(<span class="hljs-string">"Streamlet %s has produced an element with a value of: '%s'"</span>,
streamletName,
element.toString());
System.out.println(message);
}
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">cleanup</span><span class="hljs-params">()</span> </span>{}
}
</code></pre>
<p>In this example, the sink fetches the name of the enclosing streamlet from the context passed in the <code>setup</code> method. The <code>put</code> method specifies how the sink handles each element that is received (in this case, a formatted message is logged to stdout). The <code>cleanup</code> method enables you to specify what happens after the element has been processed by the sink.</p>
<p>Here is the <code>FormattedLogSink</code> at work in an example processing graph:</p>
<pre><code class="hljs css language-java">Builder builder = Builder.newBuilder();
builder.newSource(() -&gt; <span class="hljs-string">"Here is a string to be passed to the sink"</span>)
.toSink(<span class="hljs-keyword">new</span> FormattedLogSink());
</code></pre>
<blockquote>
<p><a href="#log-operations">Log operations</a> rely on a log sink that is provided out of the box. You'll need to implement other sinks yourself.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="log-operations"></a><a href="#log-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Log operations</h3>
<p>Log operations are special cases of consume operations that log streamlet elements to stdout.</p>
<blockquote>
<p>Streamlet elements will be using their <code>toString</code> representations and at the <code>INFO</code> level.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="consume-operations"></a><a href="#consume-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consume operations</h3>
<p>Consume operations are like <a href="#sink-operations">sink operations</a> except they don't require implementing a full sink interface. Consume operations are thus suited for simple operations like formatted logging. Here's an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.concurrent.ThreadLocalRandom;
Builder builder = Builder.newBuilder()
.newSource(() -&gt; ThreadLocalRandom.current().nextInt(<span class="hljs-number">1</span>, <span class="hljs-number">11</span>))
.filter(i -&gt; i % <span class="hljs-number">2</span> == <span class="hljs-number">0</span>)
.consume(i -&gt; {
String message = String.format(<span class="hljs-string">"Even number found: %d"</span>, i);
System.out.println(message);
});
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/0.20.4-incubating/deployment-api-server"><span class="arrow-prev"></span><span>The Heron API Server</span></a><a class="docs-next button" href="/docs/0.20.4-incubating/topology-development-eco-api"><span>The ECO API for Java</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#the-heron-streamlet-api-vs-the-topology-api">The Heron Streamlet API vs. The Topology API</a></li><li><a href="#getting-started">Getting started</a><ul class="toc-headings"><li><a href="#maven-setup">Maven setup</a></li><li><a href="#java-streamlet-api-starter-project">Java Streamlet API starter project</a></li></ul></li><li><a href="#streamlet-api-topology-configuration">Streamlet API topology configuration</a><ul class="toc-headings"><li><a href="#delivery-semantics">Delivery semantics</a></li></ul></li><li><a href="#streamlets">Streamlets</a></li><li><a href="#operations">Operations</a><ul class="toc-headings"><li><a href="#map-operations">Map operations</a></li><li><a href="#flatmap-operations">FlatMap operations</a></li><li><a href="#filter-operations">Filter operations</a></li><li><a href="#union-operations">Union operations</a></li><li><a href="#clone-operations">Clone operations</a></li><li><a href="#transform-operations">Transform operations</a></li><li><a href="#join-operations">Join operations</a></li><li><a href="#key-by-operations">Key by operations</a></li><li><a href="#reduce-by-key-operations">Reduce by key operations</a></li><li><a href="#reduce-by-key-and-window-operations">Reduce by key and window operations</a></li><li><a href="#count-by-key-operations">Count by key operations</a></li><li><a href="#count-by-key-and-window-operations">Count by key and window operations</a></li><li><a href="#split-operations">Split operations</a></li><li><a href="#with-stream-operations">With stream operations</a></li><li><a href="#apply-operator-operations">Apply operator operations</a></li><li><a href="#repartition-operations">Repartition operations</a></li><li><a href="#sink-operations">Sink operations</a></li><li><a href="#log-operations">Log operations</a></li><li><a href="#consume-operations">Consume operations</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><div class="apache-disclaimer">Apache Heron is an effort undergoing incubation at <a target="_blank" href="https://apache.org/">The Apache Software Foundation (ASF)</a> sponsored by the Apache Incubator PMC. Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.<br/><br/>Apache®, the names of Apache projects, and the feather logo are either <a rel="external" href="https://www.apache.org/foundation/marks/list/">registered trademarks or trademarks</a> of the Apache Software Foundation in the United States and/or other countries.<br/><br/><div class="copyright-box">Copyright © 2023 the Apache Software Foundation, Apache Heron, Heron,
Apache, the Apache feather Logo, and the Apache Heron project logo are either registered
trademarks or trademarks of the Apache Software Foundation.</div></div><div class="apache-links"><a class="item" rel="external" href="https://incubator.apache.org/">Apache Incubator</a><div><a class="item" rel="external" href="https://www.apache.org/">About the ASF</a></div><div><a class="item" rel="external" href="https://www.apache.org/events/current-event">Events</a></div><div><a class="item" rel="external" href="https://www.apache.org/foundation/thanks.html">Thanks</a></div><div><a class="item" rel="external" href="https://www.apache.org/foundation/sponsorship.html">Become a Sponsor</a></div><div><a class="item" rel="external" href="https://www.apache.org/security/">Security</a></div><div><a class="item" rel="external" href="https://www.apache.org/licenses/">License</a></div></div></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>