<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Heron Streamlets · Apache Heron</title><meta name="viewport" content="width=device-width"/><meta name="generator" content="Docusaurus"/><meta name="description" content="&lt;!--"/><meta name="docsearch:version" content="0.20.1-incubating"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Heron Streamlets · Apache Heron"/><meta property="og:type" content="website"/><meta property="og:url" content="https://heron.apache.org/"/><meta property="og:description" content="&lt;!--"/><meta property="og:image" content="https://heron.apache.org/img/undraw_online.svg"/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://heron.apache.org/img/undraw_tweetstorm.svg"/><link rel="shortcut icon" href="/img/favicon-32x32.png"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/default.min.css"/><link rel="alternate" type="application/atom+xml" href="https://heron.apache.org/blog/atom.xml" title="Apache Heron Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://heron.apache.org/blog/feed.xml" title="Apache Heron Blog RSS Feed"/><script>
              (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
              (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
              m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
              })(window,document,'script','https://www.google-analytics.com/analytics.js','ga');

              ga('create', 'UA-198017384-1', 'auto');
              ga('send', 'pageview');
            </script><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="/js/custom.js"></script><script type="text/javascript" src="/js/fix-location.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/"><img class="logo" src="/img/HeronTextLogo-small.png" alt="Apache Heron"/><h2 class="headerTitleWithLogo">Apache Heron</h2></a><a href="/versions"><h3>0.20.1-incubating</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class=""><a href="/api/java" target="_self">Javadocs</a></li><li class=""><a href="/api/python" target="_self">Pydocs</a></li><li class="siteNavGroupActive"><a href="/docs/0.20.1-incubating/getting-started-local-single-node" target="_self">Docs</a></li><li class=""><a href="/download" target="_self">Downloads</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#apache" target="_self">Apache</a></li></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="container docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i>›</i><span>Heron Concepts</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Getting Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/getting-started-local-single-node">Local (Single Node)</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/getting-started-migrate-storm-topologies">Migrate Storm Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/getting-started-troubleshooting-guide">Troubleshooting Guide</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/deployment-overview">Deployment Overiew</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/deployment-configuration">Configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/deployment-api-server">The Heron API Server</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Topology Development APIs</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/topology-development-streamlet-api">The Heron Streamlet API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/topology-development-eco-api">The ECO API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/topology-development-topology-api-java">The Heron Topology API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/topology-development-topology-api-python">The Heron Topology API for Python</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/topology-development-streamlet-scala">The Heron Streamlet API for Scala</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client API Docs</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/client-api-docs-overview">Client API Docs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Guides</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-effectively-once-java-topologies">Effectively Once Java Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-data-model">Heron Data Model</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-tuple-serialization">Tuple Serialization</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-ui-guide">Heron UI Guide</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-topology-tuning">Topology Tuning Guide</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-packing-algorithms">Packing Algorithms</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-simulator-mode">Simulator Mode</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/guides-troubeshooting-guide">Topology Troubleshooting Guide</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Heron Concepts</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/heron-design-goals">Heron Design Goals</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/heron-topology-concepts">Heron Topologies</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/0.20.1-incubating/heron-streamlet-concepts">Heron Streamlets</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/heron-architecture">Heron Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/heron-delivery-semantics">Heron Delivery Semantics</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">State Managers</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/state-managers-zookeeper">Zookeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/state-managers-local-fs">Local File System</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Uploaders</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/uploaders-local-fs">Local File System</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/uploaders-hdfs">HDFS</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/uploaders-http">HTTP</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/uploaders-amazon-s3">Amazon S3</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/uploaders-scp">Secure Copy (SCP)</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Schedulers</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-k8s-by-hand">Kubernetes by hand</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-k8s-with-helm">Kubernetes with Helm</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-aurora-cluster">Aurora Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-aurora-local">Aurora Locally</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-local">Local Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-nomad">Nomad</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-mesos-local-mac">Mesos Cluster Locally</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-slurm">Slurm Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/schedulers-yarn">YARN Cluster</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cluster Configuration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/cluster-config-overview">Cluster Config Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/cluster-config-system-level">System Level Configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/cluster-config-instance">Heron Instance</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/cluster-config-metrics">Metrics Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/cluster-config-stream">Stream Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/cluster-config-tmanager">Topology Manager</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Observability</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/observability-prometheus">Prometheus</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/observability-graphite">Graphite</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/observability-scribe">Scribe</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">User Manuals</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/user-manuals-heron-cli">Heron Client</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/user-manuals-heron-explorer">Heron Explorer</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/user-manuals-tracker-rest">Heron Tracker REST API</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/user-manuals-heron-tracker-runbook">Heron Tracker Runbook</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/user-manuals-heron-ui-runbook">Heron UI Runbook</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/user-manuals-heron-shell">Heron Shell</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Compiling</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/compiling-overview">Compiling Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/compiling-linux">Compiling on Linux</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/compiling-osx">Compiling on OS X</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/compiling-docker">Compiling With Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/compiling-running-tests">Running Tests</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/compiling-code-organization">Code Organization</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Extending Heron</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/extending-heron-scheduler">Custom Scheduler</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/extending-heron-metric-sink">Custom Metrics Sink</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Heron Resources</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.1-incubating/heron-resources-resources">Heron Resources</a></li></ul></div></div></section></div><script>
            var coll = document.getElementsByClassName('collapsible');
            var checkActiveCategory = true;
            for (var i = 0; i < coll.length; i++) {
              var links = coll[i].nextElementSibling.getElementsByTagName('*');
              if (checkActiveCategory){
                for (var j = 0; j < links.length; j++) {
                  if (links[j].classList.contains('navListItemActive')){
                    coll[i].nextElementSibling.classList.toggle('hide');
                    coll[i].childNodes[1].classList.toggle('rotate');
                    checkActiveCategory = false;
                    break;
                  }
                }
              }

              coll[i].addEventListener('click', function() {
                var arrow = this.childNodes[1];
                arrow.classList.toggle('rotate');
                var content = this.nextElementSibling;
                content.classList.toggle('hide');
              });
            }

            document.addEventListener('DOMContentLoaded', function() {
              createToggler('#navToggler', '#docsNav', 'docsSliderActive');
              createToggler('#tocToggler', 'body', 'tocActive');

              var headings = document.querySelector('.toc-headings');
              headings && headings.addEventListener('click', function(event) {
                var el = event.target;
                while(el !== headings){
                  if (el.tagName === 'A') {
                    document.body.classList.remove('tocActive');
                    break;
                  } else{
                    el = el.parentNode;
                  }
                }
              }, false);

              function createToggler(togglerSelector, targetSelector, className) {
                var toggler = document.querySelector(togglerSelector);
                var target = document.querySelector(targetSelector);

                if (!toggler) {
                  return;
                }

                toggler.onclick = function(event) {
                  event.preventDefault();

                  target.classList.toggle(className);
                };
              }
            });
        </script></nav></div><div class="container mainContainer"><div class="wrapper"><div class="post"><header class="postHeader"><h1 class="postHeaderTitle">Heron Streamlets</h1></header><article><div><span><!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
      http://www.apache.org/licenses/LICENSE-2.0
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
-->
<p>When it was first released, Heron offered a <strong>Topology API</strong>---heavily indebted to the <a href="http://storm.apache.org/about/simple-api.html">Storm API</a>---for developing topology logic. In the original Topology API, developers creating topologies were required to explicitly:</p>
<ul>
<li>define the behavior of every <a href="topology-development-topology-api-java#spouts">spout</a> and <a href="topology-development-topology-api-java#bolts">bolt</a> in the topology</li>
<li>specify how those spouts and bolts are meant to be interconnected</li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="problems-with-the-topology-api"></a><a href="#problems-with-the-topology-api" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Problems with the Topology API</h3>
<p>Although the Storm-inspired API provided a powerful low-level interface for creating topologies, the spouts-and-bolts model also presented a variety of drawbacks for Heron developers:</p>
<table>
<thead>
<tr><th style="text-align:left">Drawback</th><th style="text-align:left">Description</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">Verbosity</td><td style="text-align:left">In the original Topology API for both Java and Python, creating spouts and bolts required substantial boilerplate and forced developers to both provide implementations for spout and bolt classes and also to specify the connections between those spouts and bolts.</td></tr>
<tr><td style="text-align:left">Difficult debugging</td><td style="text-align:left">When spouts, bolts, and the connections between them need to be created &quot;by hand,&quot; it can be challenging to trace the origin of problems in the topology's processing chain</td></tr>
<tr><td style="text-align:left">Tuple-based data model</td><td style="text-align:left">In the older topology API, spouts and bolts passed <a href="https://en.wikipedia.org/wiki/Tuple">tuples</a> and nothing but tuples within topologies. Although tuples are a powerful and flexible data type, the topology API forced <em>all</em> spouts and bolts to implement their own serialization/deserialization logic.</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="advantages-of-the-streamlet-api"></a><a href="#advantages-of-the-streamlet-api" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Advantages of the Streamlet API</h3>
<p>In contrast with the Topology API, the Heron Streamlet API offers:</p>
<table>
<thead>
<tr><th style="text-align:left">Advantage</th><th style="text-align:left">Description</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">Boilerplate-free code</td><td style="text-align:left">Instead of needing to implement spout and bolt classes over and over again, the Heron Streamlet API enables you to create stream processing logic out of functions, such as map, flatMap, join, and filter functions, instead.</td></tr>
<tr><td style="text-align:left">Easy debugging</td><td style="text-align:left">With the Heron Streamlet API, you don't have to worry about spouts and bolts, which means that you can more easily surface problems with your processing logic.</td></tr>
<tr><td style="text-align:left">Completely flexible, type-safe data model</td><td style="text-align:left">Instead of requiring that all processing components pass tuples to one another (which implicitly requires serialization to and deserializaton from your application-specific types), the Heron Streamlet API enables you to write your processing logic in accordance with whatever types you'd like---including tuples, if you wish.<br /><br />In the Streamlet API for <a href="topology-development-streamlet-api">Java</a>, all streamlets are typed (e.g. <code>Streamlet&lt;MyApplicationType&gt;</code>), which means that type errors can be caught at compile time rather than at runtime.</td></tr>
</tbody>
</table>
<h2><a class="anchor" aria-hidden="true" id="streamlet-api-topology-model"></a><a href="#streamlet-api-topology-model" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Streamlet API topology model</h2>
<p>Instead of spouts and bolts, as with the Topology API, the Streamlet API enables you to create <strong>processing graphs</strong> that are then automatically converted to spouts and bolts under the hood. Processing graphs consist of the following components:</p>
<ul>
<li><strong>Sources</strong> supply the processing graph with data from random generators, databases, web service APIs, filesystems, pub-sub messaging systems, or anything that implements the <a href="#source-operations">source</a> interface.</li>
<li><strong>Operators</strong> supply the graph's processing logic, operating on data passed into the graph by sources.</li>
<li><strong>Sinks</strong> are the terminal endpoints of the processing graph, determining what the graph <em>does</em> with the processed data. Sinks can involve storing data in a database, logging results to stdout, publishing messages to a topic in a pub-sub messaging system, and much more.</li>
</ul>
<p>The diagram below illustrates both the general model (with a single source, three operators, and one sink), and a more concrete example that includes two sources (an <a href="https://pulsar.incubator.apache.org">Apache Pulsar</a> topic and the <a href="https://developer.twitter.com/en/docs">Twitter API</a>), three operators (a <a href="#join-operations">join</a>, <a href="#flatmap-operations">flatMap</a>, and <a href="#reduce-operations">reduce</a> operation), and two <a href="#sink-operations">sinks</a> (an <a href="http://cassandra.apache.org/">Apache Cassandra</a> table and an <a href="https://spark.apache.org/">Apache Spark</a> job).</p>
<p><img src="https://www.lucidchart.com/publicSegments/view/d84026a1-d12e-4878-b8d5-5aa274ec0415/image.png" alt="Topology Operators"></p>
<h3><a class="anchor" aria-hidden="true" id="streamlets"></a><a href="#streamlets" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Streamlets</h3>
<p>The core construct underlying the Heron Streamlet API is that of the <strong>streamlet</strong>. A streamlet is an unbounded, ordered collection of <strong>elements</strong> of some data type (streamlets can consist of simple types like integers and strings or more complex, application-specific data types).</p>
<p><strong>Source streamlets</strong> supply a Heron processing graph with data inputs. These inputs can come from a wide variety of sources, such as pub-sub messaging systems like <a href="http://kafka.apache.org/">Apache
Kafka</a> and <a href="https://pulsar.incubator.apache.org">Apache Pulsar</a> (incubating), random generators, or static files like CSV or <a href="https://parquet.apache.org/">Apache Parquet</a> files.</p>
<p>Source streamlets can then be manipulated in a wide variety of ways. You can, for example:</p>
<ul>
<li>apply <a href="#map-operations">map</a>, <a href="#filter-operations">filter</a>, <a href="#flatmap-operations">flatMap</a>, and many other operations to them</li>
<li>apply operations, such as <a href="#join-operations">join</a> and <a href="#union-operations">union</a> operations, that combine streamlets together</li>
<li><a href="#reduce-by-key-and-window-operations">reduce</a> all elements in a streamlet to some single value, based on key</li>
<li>send data to <a href="#sink-operations">sinks</a> (store elements)</li>
</ul>
<p>The diagram below shows an example streamlet:</p>
<p><img src="https://www.lucidchart.com/publicSegments/view/5c451e53-46f8-4e36-86f4-9a11ca015c21/image.png" alt="Streamlet"></p>
<p>In this diagram, the <strong>source streamlet</strong> is produced by a random generator that continuously emits random integers between 1 and 100. From there:</p>
<ul>
<li>A filter operation is applied to the source streamlet that filters out all values less than or equal to 30</li>
<li>A <em>new streamlet</em> is produced by the filter operation (with the Heron Streamlet API, you're always transforming streamlets into other streamlets)</li>
<li>A map operation adds 15 to each item in the streamlet, which produces the final streamlet in our graph. We <em>could</em> hypothetically go much further and add as many transformation steps to the graph as we'd like.</li>
<li>Once the final desired streamlet is created, each item in the streamlet is sent to a sink. Sinks are where items leave the processing graph.</li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="supported-languages"></a><a href="#supported-languages" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Supported languages</h3>
<p>The Heron Streamlet API is currently available for:</p>
<ul>
<li><a href="topology-development-streamlet-api">Java</a></li>
<li><a href="topology-development-streamlet-scala">Scala</a></li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="the-heron-streamlet-api-and-topologies"></a><a href="#the-heron-streamlet-api-and-topologies" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>The Heron Streamlet API and topologies</h3>
<p>With the Heron Streamlet API <em>you still create topologies</em>, but only implicitly. Heron automatically performs the heavy lifting of converting the streamlet-based processing logic that you create into spouts and bolts and, from there, into containers that are then deployed using whichever <a href="/docs/0.20.1-incubating/schedulers-local">scheduler</a> your Heron cluster relies upon.</p>
<p>From the standpoint of both operators and developers <a href="#topology-lifecycle">managing topologies' lifecycles</a>, the resulting topologies are equivalent. From a development workflow standpoint, however, the difference is profound. You can think of the Streamlet API as a highly convenient tool for creating spouts, bolts, and the logic that connects them.</p>
<p>The basic workflow looks like this:</p>
<p><img src="https://www.lucidchart.com/publicSegments/view/6b2e9b49-ef1f-45c9-8094-1e2cefbaed7b/image.png" alt="Streamlet"></p>
<p>When creating topologies using the Heron Streamlet API, you simply write code (example <a href="#java-processing-graph-example">below</a>) in a highly functional style. From there:</p>
<ul>
<li>that code is automatically converted into spouts, bolts, and the necessary connective logic between spouts and bolts</li>
<li>the spouts and bolts are automatically converted into a <a href="topology-development-topology-api-java#logical-plan">logical plan</a> that specifies how the spouts and bolts are connected to each other</li>
<li>the logical plan is automatically converted into a <a href="topology-development-topology-api-java#physical-plan">physical plan</a> that determines how the spout and bolt instances (the colored boxes above) are distributed across the specified number of containers (in this case two)</li>
</ul>
<p>With a physical plan in place, the Streamlet API topology can be submitted to a Heron cluster.</p>
<h4><a class="anchor" aria-hidden="true" id="java-processing-graph-example"></a><a href="#java-processing-graph-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java processing graph example</h4>
<p>The code below shows how you could implement the processing graph shown <a href="#streamlets">above</a> in Java:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.concurrent.ThreadLocalRandom;

<span class="hljs-keyword">import</span> org.apache.heron.streamlet.Builder;
<span class="hljs-keyword">import</span> org.apache.heron.streamlet.Config;
<span class="hljs-keyword">import</span> org.apache.heron.streamlet.Runner;

Builder builder = Builder.newBuilder();

<span class="hljs-comment">// Function for generating random integers</span>
<span class="hljs-function"><span class="hljs-keyword">int</span> <span class="hljs-title">randomInt</span><span class="hljs-params">(<span class="hljs-keyword">int</span> lower, <span class="hljs-keyword">int</span> upper)</span> </span>{
    <span class="hljs-keyword">return</span> ThreadLocalRandom.current().nextInt(lower, upper + <span class="hljs-number">1</span>);
}

<span class="hljs-comment">// Source streamlet</span>
builder.newSource(() -&gt; randomInt(<span class="hljs-number">1</span>, <span class="hljs-number">100</span>))
    <span class="hljs-comment">// Filter operation</span>
    .filter(i -&gt; i &gt; <span class="hljs-number">30</span>)
    <span class="hljs-comment">// Map operation</span>
    .map(i -&gt; i + <span class="hljs-number">15</span>)
    <span class="hljs-comment">// Log sink</span>
    .log();

Config config = <span class="hljs-keyword">new</span> Config();
<span class="hljs-comment">// This topology will be spread across two containers</span>
config.setNumContainers(<span class="hljs-number">2</span>);

<span class="hljs-comment">// Submit the processing graph to Heron as a topology</span>
<span class="hljs-keyword">new</span> Runner(<span class="hljs-string">"IntegerProcessingGraph"</span>, config, builder).run();
</code></pre>
<p>As you can see, the Java code for the example streamlet processing graph requires very little boilerplate and is heavily indebted to Java 8 <a href="https://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html">lambda</a> patterns.</p>
<h2><a class="anchor" aria-hidden="true" id="streamlet-operations"></a><a href="#streamlet-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Streamlet operations</h2>
<p>In the Heron Streamlet API, processing data means <em>transforming streamlets into other streamlets</em>. This can be done using a wide variety of available operations, including many that you may be familiar with from functional programming:</p>
<table>
<thead>
<tr><th style="text-align:left">Operation</th><th style="text-align:left">Description</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="#map-operations">map</a></td><td style="text-align:left">Returns a new streamlet by applying the supplied mapping function to each element in the original streamlet</td></tr>
<tr><td style="text-align:left"><a href="#flatMap-operations">flatMap</a></td><td style="text-align:left">Like a map operation but with the important difference that each element of the streamlet is flattened into a collection type</td></tr>
<tr><td style="text-align:left"><a href="#filter-operations">filter</a></td><td style="text-align:left">Returns a new streamlet containing only the elements that satisfy the supplied filtering function</td></tr>
<tr><td style="text-align:left"><a href="#filter-operations">union</a></td><td style="text-align:left">Unifies two streamlets into one, without <a href="#windowing">windowing</a> or modifying the elements of the two streamlets</td></tr>
<tr><td style="text-align:left"><a href="#clone-operations">clone</a></td><td style="text-align:left">Creates any number of identical copies of a streamlet</td></tr>
<tr><td style="text-align:left"><a href="#transform-operations">transform</a></td><td style="text-align:left">Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations)</td><td>Modify the elements from an incoming streamlet and update the topology's state</td></tr>
<tr><td style="text-align:left"><a href="#key-by-operations">keyBy</a></td><td style="text-align:left">Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet</td></tr>
<tr><td style="text-align:left"><a href="#reduce-by-key-operations">reduceByKey</a></td><td style="text-align:left">Produces a streamlet of key-value on each key and in accordance with a reduce function that you apply to all the accumulated values</td></tr>
<tr><td style="text-align:left"><a href="#reduce-by-key-and-window-operations">reduceByKeyAndWindow</a></td><td style="text-align:left">Produces a streamlet of key-value on each key, within a <a href="#windowing">time window</a>, and in accordance with a reduce function that you apply to all the accumulated values</td></tr>
<tr><td style="text-align:left"><a href="#count-by-key-operations">countByKey</a></td><td style="text-align:left">A special reduce operation of counting number of tuples on each key</td></tr>
<tr><td style="text-align:left"><a href="#count-by-key-and-window-operations">countByKeyAndWindow</a></td><td style="text-align:left">A special reduce operation of counting number of tuples on each key, within a <a href="#windowing">time window</a></td></tr>
<tr><td style="text-align:left"><a href="#split-operations">split</a></td><td style="text-align:left">Split a streamlet into multiple streamlets with different id.</td></tr>
<tr><td style="text-align:left"><a href="#with-stream-operations">withStream</a></td><td style="text-align:left">Select a stream with id from a streamlet that contains multiple streams</td></tr>
<tr><td style="text-align:left"><a href="#apply-operator-operations">applyOperator</a></td><td style="text-align:left">Returns a new streamlet by applying an user defined operator to the original streamlet</td></tr>
<tr><td style="text-align:left"><a href="#join-operations">join</a></td><td style="text-align:left">Joins two separate key-value streamlets into a single streamlet on a key, within a <a href="#windowing">time window</a>, and in accordance with a join function</td></tr>
<tr><td style="text-align:left"><a href="#log-operations">log</a></td><td style="text-align:left">Logs the final streamlet output of the processing graph to stdout</td></tr>
<tr><td style="text-align:left"><a href="#sink-operations">toSink</a></td><td style="text-align:left">Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc.</td></tr>
<tr><td style="text-align:left"><a href="#consume-operations">consume</a></td><td style="text-align:left">Consume operations are like sink operations except they don't require implementing a full sink interface (consume operations are thus suited for simple operations like logging)</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="map-operations"></a><a href="#map-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Map operations</h3>
<p>Map operations create a new streamlet by applying the supplied mapping function to each element in the original streamlet.</p>
<h4><a class="anchor" aria-hidden="true" id="java-example"></a><a href="#java-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.Builder;

Builder processingGraphBuilder = Builder.newBuilder();

Streamlet&lt;Integer&gt; ones = processingGraphBuilder.newSource(() -&gt; <span class="hljs-number">1</span>);
Streamlet&lt;Integer&gt; thirteens = ones.map(i -&gt; i + <span class="hljs-number">12</span>);
</code></pre>
<p>In this example, a supplier streamlet emits an indefinite series of 1s. The <code>map</code> operation then adds 12 to each incoming element, producing a streamlet of 13s. The effect of this operation is to transform the <code>Streamlet&lt;Integer&gt;</code> into a <code>Streamlet&lt;Integer&gt;</code> with different values (map operations can also convert streamlets into streamlets of a different type).</p>
<h3><a class="anchor" aria-hidden="true" id="flatmap-operations"></a><a href="#flatmap-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>FlatMap operations</h3>
<p>FlatMap operations are like <a href="#map-operations">map operations</a> but with the important difference that each element of the streamlet is &quot;flattened&quot; into a collection type. In the Java example below, a supplier streamlet emits the same sentence over and over again; the <code>flatMap</code> operation transforms each sentence into a Java <code>List</code> of individual words.</p>
<h4><a class="anchor" aria-hidden="true" id="java-example-1"></a><a href="#java-example-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java">Streamlet&lt;String&gt; sentences = builder.newSource(() -&gt; <span class="hljs-string">"I have nothing to declare but my genius"</span>);
Streamlet&lt;List&lt;String&gt;&gt; words = sentences
        .flatMap((sentence) -&gt; Arrays.asList(sentence.split(<span class="hljs-string">"\\s+"</span>)));
</code></pre>
<p>The effect of this operation is to transform the <code>Streamlet&lt;String&gt;</code> into a <code>Streamlet&lt;List&lt;String&gt;&gt;</code> containing each word emitted by the source streamlet.</p>
<h3><a class="anchor" aria-hidden="true" id="filter-operations"></a><a href="#filter-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Filter operations</h3>
<p>Filter operations retain some elements in a streamlet and exclude other elements on the basis of a provided filtering function.</p>
<h4><a class="anchor" aria-hidden="true" id="java-example-2"></a><a href="#java-example-2" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java">Streamlet&lt;Integer&gt; randomInts =
    builder.newSource(() -&gt; ThreadLocalRandom.current().nextInt(<span class="hljs-number">1</span>, <span class="hljs-number">11</span>));
Streamlet&lt;Integer&gt; lessThanSeven = randomInts
        .filter(i -&gt; i &lt;= <span class="hljs-number">7</span>);
</code></pre>
<p>In this example, a source streamlet consisting of random integers between 1 and 10 is modified by a filter operation that removes all streamlet elements that are greater than 7.</p>
<h3><a class="anchor" aria-hidden="true" id="union-operations"></a><a href="#union-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Union operations</h3>
<p>Union operations combine two streamlets of the same type into a single streamlet without modifying the elements.</p>
<h4><a class="anchor" aria-hidden="true" id="java-example-3"></a><a href="#java-example-3" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java">Streamlet&lt;String&gt; oohs = builder.newSource(() -&gt; <span class="hljs-string">"ooh"</span>);
Streamlet&lt;String&gt; aahs = builder.newSource(() -&gt; <span class="hljs-string">"aah"</span>);

Streamlet&lt;String&gt; combined = oohs
        .union(aahs);
</code></pre>
<p>Here, one streamlet is an endless series of &quot;ooh&quot;s while the other is an endless series of &quot;aah&quot;s. The <code>union</code> operation combines them into a single streamlet of alternating &quot;ooh&quot;s and &quot;aah&quot;s.</p>
<h3><a class="anchor" aria-hidden="true" id="clone-operations"></a><a href="#clone-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Clone operations</h3>
<p>Clone operations enable you to create any number of &quot;copies&quot; of a streamlet. Each of the &quot;copy&quot; streamlets contains all the elements of the original and can be manipulated just like the original streamlet.</p>
<h4><a class="anchor" aria-hidden="true" id="java-example-4"></a><a href="#java-example-4" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.List;
<span class="hljs-keyword">import</span> java.util.concurrent.ThreadLocalRandom;

Streamlet&lt;Integer&gt; integers = builder.newSource(() -&gt; ThreadLocalRandom.current().nextInt(<span class="hljs-number">100</span>));

List&lt;Streamlet&lt;Integer&gt;&gt; copies = integers.clone(<span class="hljs-number">5</span>);
Streamlet&lt;Integer&gt; ints1 = copies.get(<span class="hljs-number">0</span>);
Streamlet&lt;Integer&gt; ints2 = copies.get(<span class="hljs-number">1</span>);
Streamlet&lt;Integer&gt; ints3 = copies.get(<span class="hljs-number">2</span>);
<span class="hljs-comment">// and so on...</span>
</code></pre>
<p>In this example, a streamlet of random integers between 1 and 100 is split into 5 identical streamlets.</p>
<h3><a class="anchor" aria-hidden="true" id="transform-operations"></a><a href="#transform-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Transform operations</h3>
<p>Transform operations are highly flexible operations that are most useful for:</p>
<ul>
<li>operations involving state in <a href="heron-delivery-semantics#stateful-topologies">stateful topologies</a></li>
<li>operations that don't neatly fit into the other categories or into a lambda-based logic</li>
</ul>
<p>Transform operations require you to implement three different methods:</p>
<ul>
<li>A <code>setup</code> method that enables you to pass a context object to the operation and to specify what happens prior to the <code>transform</code> step</li>
<li>A <code>transform</code> operation that performs the desired transformation</li>
<li>A <code>cleanup</code> method that allows you to specify what happens after the <code>transform</code> step</li>
</ul>
<p>The context object available to a transform operation provides access to:</p>
<ul>
<li>the current state of the topology</li>
<li>the topology's configuration</li>
<li>the name of the stream</li>
<li>the stream partition</li>
<li>the current task ID</li>
</ul>
<p>Here's a Java example of a transform operation in a topology where a stateful record is kept of the number of items processed:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.Context;
<span class="hljs-keyword">import</span> org.apache.heron.streamlet.SerializableTransformer;

<span class="hljs-keyword">import</span> java.util.function.Consumer;

<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">CountNumberOfItems</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">SerializableTransformer</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">String</span>&gt; </span>{
    <span class="hljs-keyword">private</span> <span class="hljs-keyword">int</span> numberOfItems;

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">setup</span><span class="hljs-params">(Context context)</span> </span>{
        numberOfItems = (<span class="hljs-keyword">int</span>) context.getState(<span class="hljs-string">"number-of-items"</span>);
        context.getState().put(<span class="hljs-string">"number-of-items"</span>, numberOfItems + <span class="hljs-number">1</span>);
    }

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">transform</span><span class="hljs-params">(String in, Consumer&lt;String&gt; consumer)</span> </span>{
        String transformedString = <span class="hljs-comment">// Apply some operation to the incoming value</span>
        consumer.accept(transformedString);
    }

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">cleanup</span><span class="hljs-params">()</span> </span>{
        System.out.println(
                String.format(<span class="hljs-string">"Successfully processed new state: %d"</span>, numberOfItems));
    }
}
</code></pre>
<p>This operation does a few things:</p>
<ul>
<li>In the <code>setup</code> method, the <a href="/api/java/org/apache/heron/streamlet/Context.html"><code>Context</code></a> object is used to access the current state (which has the semantics of a Java <code>Map</code>). The current number of items processed is incremented by one and then saved as the new state.</li>
<li>In the <code>transform</code> method, the incoming string is transformed in some way and then &quot;accepted&quot; as the new value.</li>
<li>In the <code>cleanup</code> step, the current count of items processed is logged.</li>
</ul>
<p>Here's that operation within the context of a streamlet processing graph:</p>
<pre><code class="hljs css language-java">builder.newSource(() -&gt; <span class="hljs-string">"Some string over and over"</span>);
        .transform(<span class="hljs-keyword">new</span> CountNumberOfItems())
        .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="key-by-operations"></a><a href="#key-by-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Key by operations</h3>
<p>Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet.</p>
<h4><a class="anchor" aria-hidden="true" id="java-example-5"></a><a href="#java-example-5" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;

Builder builder = Builder.newBuilder()
    .newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
    <span class="hljs-comment">// Convert each sentence into individual words</span>
    .flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
    .keyBy(
        <span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span>
        word -&gt; word,
        <span class="hljs-comment">// Value extractor (get the length of each word)</span>
        word -&gt; workd.length()
    )
    <span class="hljs-comment">// The result is logged</span>
    .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="reduce-by-key-operations"></a><a href="#reduce-by-key-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reduce by key operations</h3>
<p>You can apply <a href="https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html">reduce</a> operations to streamlets by specifying:</p>
<ul>
<li>a key extractor that determines what counts as the key for the streamlet</li>
<li>a value extractor that determines which final value is chosen for each element of the streamlet</li>
<li>a reduce function that produces a single value for each key in the streamlet</li>
</ul>
<p>Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value).</p>
<h4><a class="anchor" aria-hidden="true" id="java-example-6"></a><a href="#java-example-6" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;

Builder builder = Builder.newBuilder()
    .newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
    <span class="hljs-comment">// Convert each sentence into individual words</span>
    .flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
    .reduceByKeyAndWindow(
        <span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span>
        word -&gt; word,
        <span class="hljs-comment">// Value extractor (each word appears only once, hence the value is always 1)</span>
        word -&gt; <span class="hljs-number">1</span>,
        <span class="hljs-comment">// Reduce operation (a running sum)</span>
        (x, y) -&gt; x + y
    )
    <span class="hljs-comment">// The result is logged</span>
    .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="reduce-by-key-and-window-operations"></a><a href="#reduce-by-key-and-window-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reduce by key and window operations</h3>
<p>You can apply <a href="https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html">reduce</a> operations to streamlets by specifying:</p>
<ul>
<li>a key extractor that determines what counts as the key for the streamlet</li>
<li>a value extractor that determines which final value is chosen for each element of the streamlet</li>
<li>a <a href="heron-topology-concepts#window-operations">time window</a> across which the operation will take place</li>
<li>a reduce function that produces a single value for each key in the streamlet</li>
</ul>
<p>Reduce by key and window operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value, as well as information about the window in which the operation took place).</p>
<h4><a class="anchor" aria-hidden="true" id="java-example-7"></a><a href="#java-example-7" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;

<span class="hljs-keyword">import</span> org.apache.heron.streamlet.WindowConfig;

Builder builder = Builder.newBuilder();

builder.newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
    .flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
    .reduceByKeyAndWindow(
        <span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span>
        word -&gt; word,
        <span class="hljs-comment">// Value extractor (each word appears only once, hence the value is always 1)</span>
        word -&gt; <span class="hljs-number">1</span>,
        <span class="hljs-comment">// Window configuration</span>
        WindowConfig.TumblingCountWindow(<span class="hljs-number">50</span>),
        <span class="hljs-comment">// Reduce operation (a running sum)</span>
        (x, y) -&gt; x + y
    )
    <span class="hljs-comment">// The result is logged</span>
    .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="count-by-key-operations"></a><a href="#count-by-key-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Count by key operations</h3>
<p>Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered.</p>
<h4><a class="anchor" aria-hidden="true" id="java-example-8"></a><a href="#java-example-8" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;

Builder builder = Builder.newBuilder()
    .newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
    <span class="hljs-comment">// Convert each sentence into individual words</span>
    .flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
    .countByKeyAndWindow(word -&gt; word)
    <span class="hljs-comment">// The result is logged</span>
    .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="count-by-key-and-window-operations"></a><a href="#count-by-key-and-window-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Count by key and window operations</h3>
<p>Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each <a href="#windowing">time window</a>.</p>
<h4><a class="anchor" aria-hidden="true" id="java-example-9"></a><a href="#java-example-9" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;

<span class="hljs-keyword">import</span> org.apache.heron.streamlet.WindowConfig;

Builder builder = Builder.newBuilder()
    .newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
    <span class="hljs-comment">// Convert each sentence into individual words</span>
    .flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
    .countByKeyAndWindow(
        <span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span>
        word -&gt; word,
        <span class="hljs-comment">// Window configuration</span>
        WindowConfig.TumblingCountWindow(<span class="hljs-number">50</span>),
    )
    <span class="hljs-comment">// The result is logged</span>
    .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="split-operations"></a><a href="#split-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Split operations</h3>
<p>Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet.</p>
<h4><a class="anchor" aria-hidden="true" id="java-example-10"></a><a href="#java-example-10" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;

Map&lt;String, SerializablePredicate&lt;String&gt;&gt; splitter = <span class="hljs-keyword">new</span> HashMap();
    splitter.put(<span class="hljs-string">"long_word"</span>, s -&gt; s.length() &gt;= <span class="hljs-number">4</span>);
    splitter.put(<span class="hljs-string">"short_word"</span>, s -&gt; s.length() &lt; <span class="hljs-number">4</span>);

Builder builder = Builder.newBuilder()
    .newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
    <span class="hljs-comment">// Convert each sentence into individual words</span>
    .flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
    <span class="hljs-comment">// Splits the stream into streams of long and short words</span>
    .split(splitter)
    <span class="hljs-comment">// Choose the stream of the short words</span>
    .withStream(<span class="hljs-string">"short_word"</span>)
    <span class="hljs-comment">// The result is logged</span>
    .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="with-stream-operations"></a><a href="#with-stream-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>With stream operations</h3>
<p>With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with <a href="#split-operations">split</a>.</p>
<h3><a class="anchor" aria-hidden="true" id="apply-operator-operations"></a><a href="#apply-operator-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Apply operator operations</h3>
<p>Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet.</p>
<h4><a class="anchor" aria-hidden="true" id="java-example-11"></a><a href="#java-example-11" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;

<span class="hljs-keyword">private</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">MyBoltOperator</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">MyBolt</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">IStreamletRichOperator</span>&lt;<span class="hljs-title">Double</span>, <span class="hljs-title">Double</span>&gt; </span>{
}

Builder builder = Builder.newBuilder()
    .newSource(() -&gt; <span class="hljs-string">"Mary had a little lamb"</span>)
    <span class="hljs-comment">// Convert each sentence into individual words</span>
    .flatMap(sentence -&gt; Arrays.asList(sentence.toLowerCase().split(<span class="hljs-string">"\\s+"</span>)))
    <span class="hljs-comment">// Apply user defined operation</span>
    .applyOperator(<span class="hljs-keyword">new</span> MyBoltOperator())
    <span class="hljs-comment">// The result is logged</span>
    .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="join-operations"></a><a href="#join-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Join operations</h3>
<p>Join operations in the Streamlet API take two streamlets (a &quot;left&quot; and a &quot;right&quot; streamlet) and join them together:</p>
<ul>
<li>based on a key extractor for each streamlet</li>
<li>over key-value elements accumulated during a specified <a href="#windowing">time window</a></li>
<li>based on a <a href="#join-types">join type</a> (<a href="#inner-joins">inner</a>, <a href="#outer-left-joins">outer left</a>, <a href="#outer-right-joins">outer right</a>, or <a href="#outer-joins">outer</a>)</li>
<li>using a join function that specifies <em>how</em> values will be processed</li>
</ul>
<p>You may already be familiar with <code>JOIN</code> operations in SQL databases, like this:</p>
<pre><code class="hljs css language-sql"><span class="hljs-keyword">SELECT</span> username, email
<span class="hljs-keyword">FROM</span> all_users
<span class="hljs-keyword">INNER</span> <span class="hljs-keyword">JOIN</span> banned_users <span class="hljs-keyword">ON</span> all_users.username <span class="hljs-keyword">NOT</span> <span class="hljs-keyword">IN</span> banned_users.username;
</code></pre>
<blockquote>
<p>If you'd like to unite two streamlets into one <em>without</em> applying a window or a join function, you can use a <a href="#union-operations">union</a> operation, which are available for key-value streamlets as well as normal streamlets.</p>
</blockquote>
<p>All join operations are performed:</p>
<ol>
<li>Over elements accumulated during a specified <a href="#windowing">time window</a></li>
<li>In accordance with a key and value extracted from each streamlet element (you must provide extractor functions for both)</li>
<li>In accordance with a join function that produces a &quot;joined&quot; value for each pair of streamlet elements</li>
</ol>
<h4><a class="anchor" aria-hidden="true" id="join-types"></a><a href="#join-types" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Join types</h4>
<p>The Heron Streamlet API supports four types of joins:</p>
<table>
<thead>
<tr><th style="text-align:left">Type</th><th style="text-align:left">What the join operation yields</th><th style="text-align:left">Default?</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="#inner-joins">Inner</a></td><td style="text-align:left">All key-values with matched keys across the left and right stream</td><td style="text-align:left">Yes</td></tr>
<tr><td style="text-align:left"><a href="#outer-left-joins">Outer left</a></td><td style="text-align:left">All key-values with matched keys across both streams plus unmatched keys in the left stream</td></tr>
<tr><td style="text-align:left"><a href="#outer-right-joins">Outer right</a></td><td style="text-align:left">All key-values with matched keys across both streams plus unmatched keys in the left stream</td></tr>
<tr><td style="text-align:left"><a href="#outer-joins">Outer</a></td><td style="text-align:left">All key-values across both the left and right stream, regardless of whether or not any given element has a matching key in the other stream</td></tr>
</tbody>
</table>
<h4><a class="anchor" aria-hidden="true" id="inner-joins"></a><a href="#inner-joins" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Inner joins</h4>
<p>Inner joins operate over the <a href="https://en.wikipedia.org/wiki/Cartesian_product">Cartesian product</a> of the left stream and the right stream, i.e. over all the whole set of all ordered pairs between the two streams. Imagine this set of key-value pairs accumulated within a time window:</p>
<table>
<thead>
<tr><th style="text-align:left">Left streamlet</th><th style="text-align:left">Right streamlet</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">(&quot;player1&quot;, 4)</td><td style="text-align:left">(&quot;player1&quot;, 10)</td></tr>
<tr><td style="text-align:left">(&quot;player1&quot;, 5)</td><td style="text-align:left">(&quot;player1&quot;, 12)</td></tr>
<tr><td style="text-align:left">(&quot;player1&quot;, 17)</td><td style="text-align:left">(&quot;player2&quot;, 27)</td></tr>
</tbody>
</table>
<p>An inner join operation would thus apply the join function to all key-values with matching keys, thus <strong>3 × 2 = 6</strong> in total, producing this set of key-values:</p>
<table>
<thead>
<tr><th style="text-align:left">Included key-values</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">(&quot;player1&quot;, 4)</td></tr>
<tr><td style="text-align:left">(&quot;player1&quot;, 5)</td></tr>
<tr><td style="text-align:left">(&quot;player1&quot;, 10)</td></tr>
<tr><td style="text-align:left">(&quot;player1&quot;, 12)</td></tr>
<tr><td style="text-align:left">(&quot;player1&quot;, 17)</td></tr>
</tbody>
</table>
<blockquote>
<p>Note that the <code>(&quot;player2&quot;, 27)</code> key-value pair was <em>not</em> included in the stream because there's no matching key-value in the left streamlet.</p>
</blockquote>
<p>If the supplied join function, say, added the values together, then the resulting joined stream would look like this:</p>
<table>
<thead>
<tr><th style="text-align:left">Operation</th><th style="text-align:left">Joined Streamlet</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">4 + 10</td><td style="text-align:left">(&quot;player1&quot;, 14)</td></tr>
<tr><td style="text-align:left">4 + 12</td><td style="text-align:left">(&quot;player1&quot;, 16)</td></tr>
<tr><td style="text-align:left">5 + 10</td><td style="text-align:left">(&quot;player1&quot;, 15)</td></tr>
<tr><td style="text-align:left">5 + 12</td><td style="text-align:left">(&quot;player1&quot;, 17)</td></tr>
<tr><td style="text-align:left">17 + 10</td><td style="text-align:left">(&quot;player1&quot;, 27)</td></tr>
<tr><td style="text-align:left">17 + 12</td><td style="text-align:left">(&quot;player1&quot;, 29)</td></tr>
</tbody>
</table>
<blockquote>
<p>Inner joins are the &quot;default&quot; join type in the Heron Streamlet API. If you call the <code>join</code> method without specifying a join type, an inner join will be applied.</p>
</blockquote>
<h5><a class="anchor" aria-hidden="true" id="java-example-12"></a><a href="#java-example-12" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h5>
<pre><code class="hljs css language-java"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Score</span> </span>{
    String playerUsername;
    <span class="hljs-keyword">int</span> playerScore;

    <span class="hljs-comment">// Setters and getters</span>
}

Streamlet&lt;Score&gt; scores1 = <span class="hljs-comment">/* A stream of player scores */</span>;
Streamlet&lt;Score&gt; scores2 = <span class="hljs-comment">/* A second stream of player scores */</span>;

scores1
    .join(
        scores2,
        <span class="hljs-comment">// Key extractor for the left stream (scores1)</span>
        score -&gt; score.getPlayerUsername(),
        <span class="hljs-comment">// Key extractor for the right stream (scores2)</span>
        score -&gt; score.getPlayerScore(),
        <span class="hljs-comment">// Window configuration</span>
        WindowConfig.TumblingCountWindow(<span class="hljs-number">50</span>),
        <span class="hljs-comment">// Join function (selects the larger score as the value using</span>
        <span class="hljs-comment">// using a ternary operator)</span>
        (x, y) -&gt;
            (x.getPlayerScore() &gt;= y.getPlayerScore()) ?
                x.getPlayerScore() :
                y.getPlayerScore()
    )
    .log();
</code></pre>
<p>In this example, two streamlets consisting of <code>Score</code> objects are joined. In the <code>join</code> function, a key and value extractor are supplied along with a window configuration and a join function. The resulting, joined streamlet will consist of key-value pairs in which each player's username will be the key and the joined---in this case highest---score will be the value.</p>
<p>By default, an <a href="#inner-joins">inner join</a> is applied in join operations but you can also specify a different join type. Here's a Java example for an <a href="#outer-right-joins">outer right</a> join:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.JoinType;

scores1
    .join(
        scores2,
        <span class="hljs-comment">// Key extractor for the left stream (scores1)</span>
        score -&gt; score.getPlayerUsername(),
        <span class="hljs-comment">// Key extractor for the right stream (scores2)</span>
        score -&gt; score.getPlayerScore(),
        <span class="hljs-comment">// Window configuration</span>
        WindowConfig.TumblingCountWindow(<span class="hljs-number">50</span>),
        <span class="hljs-comment">// Join type</span>
        JoinType.OUTER_RIGHT,
        <span class="hljs-comment">// Join function (selects the larger score as the value using</span>
        <span class="hljs-comment">// using a ternary operator)</span>
        (x, y) -&gt;
            (x.getPlayerScore() &gt;= y.getPlayerScore()) ?
                x.getPlayerScore() :
                y.getPlayerScore()
    )
    .log();
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="outer-left-joins"></a><a href="#outer-left-joins" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Outer left joins</h4>
<p>An outer left join includes the results of an <a href="#inner-joins">inner join</a> <em>plus</em> all of the unmatched keys in the left stream. Take this example left and right streamlet:</p>
<table>
<thead>
<tr><th style="text-align:left">Left streamlet</th><th style="text-align:left">Right streamlet</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">(&quot;player1&quot;, 4)</td><td style="text-align:left">(&quot;player1&quot;, 10)</td></tr>
<tr><td style="text-align:left">(&quot;player2&quot;, 5)</td><td style="text-align:left">(&quot;player4&quot;, 12)</td></tr>
<tr><td style="text-align:left">(&quot;player3&quot;, 17)</td></tr>
</tbody>
</table>
<p>The resulting set of key-values within the time window:</p>
<table>
<thead>
<tr><th style="text-align:left">Included key-values</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">(&quot;player1&quot;, 4)</td></tr>
<tr><td style="text-align:left">(&quot;player1&quot;, 10)</td></tr>
<tr><td style="text-align:left">(&quot;player2&quot;, 5)</td></tr>
<tr><td style="text-align:left">(&quot;player3&quot;, 17)</td></tr>
</tbody>
</table>
<p>In this case, key-values with a key of <code>player4</code> are excluded because they are in the right stream but have no matching key with any element in the left stream.</p>
<h4><a class="anchor" aria-hidden="true" id="outer-right-joins"></a><a href="#outer-right-joins" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Outer right joins</h4>
<p>An outer right join includes the results of an <a href="#inner-joins">inner join</a> <em>plus</em> all of the unmatched keys in the right stream. Take this example left and right streamlet (from <a href="#outer-left-joins">above</a>):</p>
<table>
<thead>
<tr><th style="text-align:left">Left streamlet</th><th style="text-align:left">Right streamlet</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">(&quot;player1&quot;, 4)</td><td style="text-align:left">(&quot;player1&quot;, 10)</td></tr>
<tr><td style="text-align:left">(&quot;player2&quot;, 5)</td><td style="text-align:left">(&quot;player4&quot;, 12)</td></tr>
<tr><td style="text-align:left">(&quot;player3&quot;, 17)</td></tr>
</tbody>
</table>
<p>The resulting set of key-values within the time window:</p>
<table>
<thead>
<tr><th style="text-align:left">Included key-values</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">(&quot;player1&quot;, 4)</td></tr>
<tr><td style="text-align:left">(&quot;player1&quot;, 10)</td></tr>
<tr><td style="text-align:left">(&quot;player2&quot;, 5)</td></tr>
<tr><td style="text-align:left">(&quot;player4&quot;, 17)</td></tr>
</tbody>
</table>
<p>In this case, key-values with a key of <code>player3</code> are excluded because they are in the left stream but have no matching key with any element in the right stream.</p>
<h4><a class="anchor" aria-hidden="true" id="outer-joins"></a><a href="#outer-joins" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Outer joins</h4>
<p>Outer joins include <em>all</em> key-values across both the left and right stream, regardless of whether or not any given element has a matching key in the other stream. If you want to ensure that no element is left out of a resulting joined streamlet, use an outer join. Take this example left and right streamlet (from <a href="#outer-left-joins">above</a>):</p>
<table>
<thead>
<tr><th style="text-align:left">Left streamlet</th><th style="text-align:left">Right streamlet</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">(&quot;player1&quot;, 4)</td><td style="text-align:left">(&quot;player1&quot;, 10)</td></tr>
<tr><td style="text-align:left">(&quot;player2&quot;, 5)</td><td style="text-align:left">(&quot;player4&quot;, 12)</td></tr>
<tr><td style="text-align:left">(&quot;player3&quot;, 17)</td></tr>
</tbody>
</table>
<p>The resulting set of key-values within the time window:</p>
<table>
<thead>
<tr><th style="text-align:left">Included key-values</th></tr>
</thead>
<tbody>
</tbody>
</table>
<p>(&quot;player1&quot;, 4)
(&quot;player1&quot;, 10)
(&quot;player2&quot;, 5)
(&quot;player4&quot;, 12)
(&quot;player3&quot;, 17)</p>
<blockquote>
<p>Note that <em>all</em> key-values were indiscriminately included in the joined set.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="sink-operations"></a><a href="#sink-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Sink operations</h3>
<p>In processing graphs like the ones you build using the Heron Streamlet API, <strong>sinks</strong> are essentially the terminal points in your graph, where your processing logic comes to an end. A processing graph can end with writing to a database, publishing to a topic in a pub-sub messaging system, and so on. With the Streamlet API, you can implement your own custom sinks.</p>
<h4><a class="anchor" aria-hidden="true" id="java-example-13"></a><a href="#java-example-13" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.Context;
<span class="hljs-keyword">import</span> org.apache.heron.streamlet.Sink;

<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">FormattedLogSink</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Sink</span>&lt;<span class="hljs-title">T</span>&gt; </span>{
    <span class="hljs-keyword">private</span> String streamletName;

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">setup</span><span class="hljs-params">(Context context)</span> </span>{
        streamletName = context.getStreamletName();
    }

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">put</span><span class="hljs-params">(T element)</span> </span>{
        String message = String.format(<span class="hljs-string">"Streamlet %s has produced an element with a value of: '%s'"</span>,
                streamletName,
                element.toString());
        System.out.println(message);
    }

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">cleanup</span><span class="hljs-params">()</span> </span>{}
}
</code></pre>
<p>In this example, the sink fetches the name of the enclosing streamlet from the context passed in the <code>setup</code> method. The <code>put</code> method specifies how the sink handles each element that is received (in this case, a formatted message is logged to stdout). The <code>cleanup</code> method enables you to specify what happens after the element has been processed by the sink.</p>
<p>Here is the <code>FormattedLogSink</code> at work in an example processing graph:</p>
<pre><code class="hljs css language-java">Builder builder = Builder.newBuilder();

builder.newSource(() -&gt; <span class="hljs-string">"Here is a string to be passed to the sink"</span>)
        .toSink(<span class="hljs-keyword">new</span> FormattedLogSink());
</code></pre>
<blockquote>
<p><a href="#log-operations">Log operations</a> rely on a log sink that is provided out of the box. You'll need to implement other sinks yourself.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="consume-operations"></a><a href="#consume-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consume operations</h3>
<p>Consume operations are like <a href="#sink-operations">sink operations</a> except they don't require implementing a full sink interface. Consume operations are thus suited for simple operations like formatted logging.</p>
<h4><a class="anchor" aria-hidden="true" id="java-example-14"></a><a href="#java-example-14" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java example</h4>
<pre><code class="hljs css language-java">Builder builder = Builder.newBuilder()
        .newSource(() -&gt; generateRandomInteger())
        .filter(i -&gt; i % <span class="hljs-number">2</span> == <span class="hljs-number">0</span>)
        .consume(i -&gt; {
            String message = String.format(<span class="hljs-string">"Even number found: %d"</span>, i);
            System.out.println(message);
        });
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="partitioning"></a><a href="#partitioning" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Partitioning</h2>
<p>In the topology API, processing parallelism can be managed via adjusting the number of spouts and bolts performing different operations, enabling you to, for example, increase the relative parallelism of a bolt by using three of that bolt instead of two.</p>
<p>The Heron Streamlet API provides a different mechanism for controlling parallelism: <strong>partitioning</strong>. To understand partitioning, keep in mind that rather than physical spouts and bolts, the core processing construct in the Heron Streamlet API is the processing step. With the Heron Streamlet API, you can explicitly assign a number of partitions to each processing step in your graph (the default is one partition).</p>
<p>The example topology <a href="#streamlets">above</a>, for example, has five steps:</p>
<ul>
<li>the random integer source</li>
<li>the &quot;add one&quot; map operation</li>
<li>the union operation</li>
<li>the filtering operation</li>
<li>the logging operation.</li>
</ul>
<p>You could apply varying numbers of partitions to each step in that topology like this:</p>
<pre><code class="hljs css language-java">Builder builder = Builder.newBuilder();

Streamlet&lt;Integer&gt; zeroes = builder.newSource(() -&gt; <span class="hljs-number">0</span>)
        .setName(<span class="hljs-string">"zeroes"</span>);

builder.newSource(() -&gt; ThreadLocalRandom.current().nextInt(<span class="hljs-number">1</span>, <span class="hljs-number">11</span>))
        .setName(<span class="hljs-string">"random-ints"</span>)
        .setNumPartitions(<span class="hljs-number">3</span>)
        .map(i -&gt; i + <span class="hljs-number">1</span>)
        .setName(<span class="hljs-string">"add-one"</span>)
        .repartition(<span class="hljs-number">3</span>)
        .union(zeroes)
        .setName(<span class="hljs-string">"unify-streams"</span>)
        .repartition(<span class="hljs-number">2</span>)
        .filter(i -&gt; i != <span class="hljs-number">2</span>)
        .setName(<span class="hljs-string">"remove-all-twos"</span>)
        .repartition(<span class="hljs-number">1</span>)
        .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="repartition-operations"></a><a href="#repartition-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Repartition operations</h3>
<p>As explained <a href="#partitioning">above</a>, when you set a number of partitions for a specific operation (included for source streamlets), the same number of partitions is applied to all downstream operations <em>until</em> a different number is explicitly set.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Arrays;

Builder builder = Builder.newBuilder();

builder.newSource(() -&gt; ThreadLocalRandom.current().nextInt(<span class="hljs-number">1</span>, <span class="hljs-number">11</span>))
    .repartition(<span class="hljs-number">4</span>, (element, numPartitions) -&gt; {
        <span class="hljs-keyword">if</span> (element &gt; <span class="hljs-number">5</span>) {
            <span class="hljs-keyword">return</span> Arrays.asList(<span class="hljs-number">0</span>, <span class="hljs-number">1</span>);
        } <span class="hljs-keyword">else</span> {
            <span class="hljs-keyword">return</span> Arrays.asList(<span class="hljs-number">2</span>, <span class="hljs-number">3</span>);
        }
    });
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/0.20.1-incubating/heron-topology-concepts"><span class="arrow-prev">← </span><span>Heron Topologies</span></a><a class="docs-next button" href="/docs/0.20.1-incubating/heron-architecture"><span>Heron Architecture</span><span class="arrow-next"> →</span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#streamlet-api-topology-model">Streamlet API topology model</a><ul class="toc-headings"><li><a href="#streamlets">Streamlets</a></li><li><a href="#supported-languages">Supported languages</a></li><li><a href="#the-heron-streamlet-api-and-topologies">The Heron Streamlet API and topologies</a></li></ul></li><li><a href="#streamlet-operations">Streamlet operations</a><ul class="toc-headings"><li><a href="#map-operations">Map operations</a></li><li><a href="#flatmap-operations">FlatMap operations</a></li><li><a href="#filter-operations">Filter operations</a></li><li><a href="#union-operations">Union operations</a></li><li><a href="#clone-operations">Clone operations</a></li><li><a href="#transform-operations">Transform operations</a></li><li><a href="#key-by-operations">Key by operations</a></li><li><a href="#reduce-by-key-operations">Reduce by key operations</a></li><li><a href="#reduce-by-key-and-window-operations">Reduce by key and window operations</a></li><li><a href="#count-by-key-operations">Count by key operations</a></li><li><a href="#count-by-key-and-window-operations">Count by key and window operations</a></li><li><a href="#split-operations">Split operations</a></li><li><a href="#with-stream-operations">With stream operations</a></li><li><a href="#apply-operator-operations">Apply operator operations</a></li><li><a href="#join-operations">Join operations</a></li><li><a href="#sink-operations">Sink operations</a></li><li><a href="#consume-operations">Consume operations</a></li></ul></li><li><a href="#partitioning">Partitioning</a><ul class="toc-headings"><li><a href="#repartition-operations">Repartition operations</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><div class="apache-disclaimer">Apache Heron is an effort undergoing incubation at <a target="_blank" href="https://apache.org/">The Apache Software Foundation (ASF)</a> sponsored by the Apache Incubator PMC. Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.<br/><br/>Apache®, the names of Apache projects, and the feather logo are either <a rel="external" href="https://www.apache.org/foundation/marks/list/">registered trademarks or trademarks</a> of the Apache Software Foundation in the United States and/or other countries.<br/><br/><div class="copyright-box">Copyright © 2023 the Apache Software Foundation, Apache Heron, Heron, 
  Apache, the Apache feather Logo, and the Apache Heron project logo are either registered 
  trademarks or trademarks of the Apache Software Foundation.</div></div><div class="apache-links"><a class="item" rel="external" href="https://incubator.apache.org/">Apache Incubator</a><div><a class="item" rel="external" href="https://www.apache.org/">About the ASF</a></div><div><a class="item" rel="external" href="https://www.apache.org/events/current-event">Events</a></div><div><a class="item" rel="external" href="https://www.apache.org/foundation/thanks.html">Thanks</a></div><div><a class="item" rel="external" href="https://www.apache.org/foundation/sponsorship.html">Become a Sponsor</a></div><div><a class="item" rel="external" href="https://www.apache.org/security/">Security</a></div><div><a class="item" rel="external" href="https://www.apache.org/licenses/">License</a></div></div></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>