<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>The Heron Streamlet API for Scala · Apache Heron</title><meta name="viewport" content="width=device-width"/><meta name="generator" content="Docusaurus"/><meta name="description" content="&lt;!--"/><meta name="docsearch:version" content="0.20.5-incubating"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="The Heron Streamlet API for Scala · Apache Heron"/><meta property="og:type" content="website"/><meta property="og:url" content="https://heron.apache.org/"/><meta property="og:description" content="&lt;!--"/><meta property="og:image" content="https://heron.apache.org/img/undraw_online.svg"/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://heron.apache.org/img/undraw_tweetstorm.svg"/><link rel="shortcut icon" href="/img/favicon-32x32.png"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/default.min.css"/><link rel="alternate" type="application/atom+xml" href="https://heron.apache.org/blog/atom.xml" title="Apache Heron Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://heron.apache.org/blog/feed.xml" title="Apache Heron Blog RSS Feed"/><script>
              (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
              (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
              m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
              })(window,document,'script','https://www.google-analytics.com/analytics.js','ga');

              ga('create', 'UA-198017384-1', 'auto');
              ga('send', 'pageview');
            </script><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="/js/custom.js"></script><script type="text/javascript" src="/js/fix-location.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/"><img class="logo" src="/img/HeronTextLogo-small.png" alt="Apache Heron"/><h2 class="headerTitleWithLogo">Apache Heron</h2></a><a href="/versions"><h3>0.20.5-incubating</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class=""><a href="/api/java" target="_self">Javadocs</a></li><li class=""><a href="/api/python" target="_self">Pydocs</a></li><li class="siteNavGroupActive"><a href="/docs/getting-started-local-single-node" target="_self">Docs</a></li><li class=""><a href="/download" target="_self">Downloads</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#apache" target="_self">Apache</a></li></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="container docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i>›</i><span>Topology Development APIs</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Getting Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/getting-started-local-single-node">Local (Single Node)</a></li><li class="navListItem"><a class="navItem" href="/docs/getting-started-migrate-storm-topologies">Migrate Storm Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/getting-started-docker">Heron &amp; Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/getting-started-troubleshooting-guide">Troubleshooting Guide</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/deployment-overview">Deployment Overiew</a></li><li class="navListItem"><a class="navItem" href="/docs/deployment-configuration">Configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/deployment-api-server">The Heron API Server</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Topology Development APIs</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/topology-development-streamlet-api">The Heron Streamlet API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/topology-development-eco-api">The ECO API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/topology-development-topology-api-java">The Heron Topology API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/topology-development-topology-api-python">The Heron Topology API for Python</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/topology-development-streamlet-scala">The Heron Streamlet API for Scala</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client API Docs</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/client-api-docs-overview">Client API Docs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Guides</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/guides-effectively-once-java-topologies">Effectively Once Java Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/guides-data-model">Heron Data Model</a></li><li class="navListItem"><a class="navItem" href="/docs/guides-tuple-serialization">Tuple Serialization</a></li><li class="navListItem"><a class="navItem" href="/docs/guides-ui-guide">Heron UI Guide</a></li><li class="navListItem"><a class="navItem" href="/docs/guides-topology-tuning">Topology Tuning Guide</a></li><li class="navListItem"><a class="navItem" href="/docs/guides-packing-algorithms">Packing Algorithms</a></li><li class="navListItem"><a class="navItem" href="/docs/guides-simulator-mode">Simulator Mode</a></li><li class="navListItem"><a class="navItem" href="/docs/guides-troubeshooting-guide">Topology Troubleshooting Guide</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Heron Concepts</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/heron-design-goals">Heron Design Goals</a></li><li class="navListItem"><a class="navItem" href="/docs/heron-topology-concepts">Heron Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/heron-streamlet-concepts">Heron Streamlets</a></li><li class="navListItem"><a class="navItem" href="/docs/heron-architecture">Heron Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/heron-delivery-semantics">Heron Delivery Semantics</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">State Managers</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/state-managers-zookeeper">Zookeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/state-managers-local-fs">Local File System</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Uploaders</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/uploaders-local-fs">Local File System</a></li><li class="navListItem"><a class="navItem" href="/docs/uploaders-hdfs">HDFS</a></li><li class="navListItem"><a class="navItem" href="/docs/uploaders-http">HTTP</a></li><li class="navListItem"><a class="navItem" href="/docs/uploaders-amazon-s3">Amazon S3</a></li><li class="navListItem"><a class="navItem" href="/docs/uploaders-scp">Secure Copy (SCP)</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Schedulers</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/schedulers-k8s-by-hand">Kubernetes by hand</a></li><li class="navListItem"><a class="navItem" href="/docs/schedulers-k8s-with-helm">Kubernetes with Helm</a></li><li class="navListItem"><a class="navItem" href="/docs/schedulers-k8s-execution-environment">Kubernetes Environment Customization</a></li><li class="navListItem"><a class="navItem" href="/docs/schedulers-aurora-cluster">Aurora Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/schedulers-aurora-local">Aurora Locally</a></li><li class="navListItem"><a class="navItem" href="/docs/schedulers-local">Local Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/schedulers-nomad">Nomad</a></li><li class="navListItem"><a class="navItem" href="/docs/schedulers-mesos-local-mac">Mesos Cluster Locally</a></li><li class="navListItem"><a class="navItem" href="/docs/schedulers-slurm">Slurm Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/schedulers-yarn">YARN Cluster</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cluster Configuration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/cluster-config-overview">Cluster Config Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/cluster-config-system-level">System Level Configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/cluster-config-instance">Heron Instance</a></li><li class="navListItem"><a class="navItem" href="/docs/cluster-config-metrics">Metrics Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/cluster-config-stream">Stream Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/cluster-config-tmanager">Topology Manager</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Observability</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/observability-prometheus">Prometheus</a></li><li class="navListItem"><a class="navItem" href="/docs/observability-graphite">Graphite</a></li><li class="navListItem"><a class="navItem" href="/docs/observability-scribe">Scribe</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">User Manuals</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/user-manuals-heron-cli">Heron Client</a></li><li class="navListItem"><a class="navItem" href="/docs/user-manuals-heron-explorer">Heron Explorer</a></li><li class="navListItem"><a class="navItem" href="/docs/user-manuals-tracker-rest">Heron Tracker REST API</a></li><li class="navListItem"><a class="navItem" href="/docs/user-manuals-heron-tracker-runbook">Heron Tracker Runbook</a></li><li class="navListItem"><a class="navItem" href="/docs/user-manuals-heron-ui-runbook">Heron UI Runbook</a></li><li class="navListItem"><a class="navItem" href="/docs/user-manuals-heron-shell">Heron Shell</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Compiling</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/compiling-overview">Compiling Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/compiling-linux">Compiling on Linux</a></li><li class="navListItem"><a class="navItem" href="/docs/compiling-osx">Compiling on OS X</a></li><li class="navListItem"><a class="navItem" href="/docs/compiling-docker">Compiling With Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/compiling-running-tests">Running Tests</a></li><li class="navListItem"><a class="navItem" href="/docs/compiling-code-organization">Code Organization</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Extending Heron</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/extending-heron-scheduler">Custom Scheduler</a></li><li class="navListItem"><a class="navItem" href="/docs/extending-heron-metric-sink">Custom Metrics Sink</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Heron Resources</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/heron-resources-resources">Heron Resources</a></li></ul></div></div></section></div><script>
            var coll = document.getElementsByClassName('collapsible');
            var checkActiveCategory = true;
            for (var i = 0; i < coll.length; i++) {
              var links = coll[i].nextElementSibling.getElementsByTagName('*');
              if (checkActiveCategory){
                for (var j = 0; j < links.length; j++) {
                  if (links[j].classList.contains('navListItemActive')){
                    coll[i].nextElementSibling.classList.toggle('hide');
                    coll[i].childNodes[1].classList.toggle('rotate');
                    checkActiveCategory = false;
                    break;
                  }
                }
              }

              coll[i].addEventListener('click', function() {
                var arrow = this.childNodes[1];
                arrow.classList.toggle('rotate');
                var content = this.nextElementSibling;
                content.classList.toggle('hide');
              });
            }

            document.addEventListener('DOMContentLoaded', function() {
              createToggler('#navToggler', '#docsNav', 'docsSliderActive');
              createToggler('#tocToggler', 'body', 'tocActive');

              var headings = document.querySelector('.toc-headings');
              headings && headings.addEventListener('click', function(event) {
                var el = event.target;
                while(el !== headings){
                  if (el.tagName === 'A') {
                    document.body.classList.remove('tocActive');
                    break;
                  } else{
                    el = el.parentNode;
                  }
                }
              }, false);

              function createToggler(togglerSelector, targetSelector, className) {
                var toggler = document.querySelector(togglerSelector);
                var target = document.querySelector(targetSelector);

                if (!toggler) {
                  return;
                }

                toggler.onclick = function(event) {
                  event.preventDefault();

                  target.classList.toggle(className);
                };
              }
            });
        </script></nav></div><div class="container mainContainer"><div class="wrapper"><div class="post"><header class="postHeader"><h1 class="postHeaderTitle">The Heron Streamlet API for Scala</h1></header><article><div><span><!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
      http://www.apache.org/licenses/LICENSE-2.0
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
-->
<h2><a class="anchor" aria-hidden="true" id="getting-started"></a><a href="#getting-started" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Getting started</h2>
<p>In order to use the Heron Streamlet API for Scala, you'll need to install the <code>heron-api</code> library.</p>
<h3><a class="anchor" aria-hidden="true" id="maven-setup"></a><a href="#maven-setup" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Maven setup</h3>
<p>In order to use the <code>heron-api</code> library, add this to the <code>dependencies</code> block of your <code>pom.xml</code> configuration file:</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.heron<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>heron-api<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>{{<span class="hljs-tag">&lt; <span class="hljs-attr">heronVersion</span> &gt;</span>}}<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="compiling-a-jar-with-dependencies"></a><a href="#compiling-a-jar-with-dependencies" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Compiling a JAR with dependencies</h4>
<p>In order to run a Scala topology created using the Heron Streamlet API in a Heron cluster, you'll need to package your topology as a &quot;fat&quot; JAR with dependencies included. You can use the <a href="https://maven.apache.org/plugins/maven-assembly-plugin/usage.html">Maven Assembly Plugin</a> to generate JARs with dependencies. To install the plugin and add a Maven goal for a single JAR, add this to the <code>plugins</code> block in your <code>pom.xml</code>:</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">plugin</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>maven-assembly-plugin<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">configuration</span>&gt;</span>
        <span class="hljs-tag">&lt;<span class="hljs-name">descriptorRefs</span>&gt;</span>
            <span class="hljs-tag">&lt;<span class="hljs-name">descriptorRef</span>&gt;</span>jar-with-dependencies<span class="hljs-tag">&lt;/<span class="hljs-name">descriptorRef</span>&gt;</span>
        <span class="hljs-tag">&lt;/<span class="hljs-name">descriptorRefs</span>&gt;</span>
        <span class="hljs-tag">&lt;<span class="hljs-name">archive</span>&gt;</span>
            <span class="hljs-tag">&lt;<span class="hljs-name">manifest</span>&gt;</span>
                <span class="hljs-tag">&lt;<span class="hljs-name">mainClass</span>&gt;</span><span class="hljs-tag">&lt;/<span class="hljs-name">mainClass</span>&gt;</span>
            <span class="hljs-tag">&lt;/<span class="hljs-name">manifest</span>&gt;</span>
        <span class="hljs-tag">&lt;/<span class="hljs-name">archive</span>&gt;</span>
    <span class="hljs-tag">&lt;/<span class="hljs-name">configuration</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">executions</span>&gt;</span>
        <span class="hljs-tag">&lt;<span class="hljs-name">execution</span>&gt;</span>
            <span class="hljs-tag">&lt;<span class="hljs-name">id</span>&gt;</span>make-assembly<span class="hljs-tag">&lt;/<span class="hljs-name">id</span>&gt;</span>
            <span class="hljs-tag">&lt;<span class="hljs-name">phase</span>&gt;</span>package<span class="hljs-tag">&lt;/<span class="hljs-name">phase</span>&gt;</span>
            <span class="hljs-tag">&lt;<span class="hljs-name">goals</span>&gt;</span>
                <span class="hljs-tag">&lt;<span class="hljs-name">goal</span>&gt;</span>single<span class="hljs-tag">&lt;/<span class="hljs-name">goal</span>&gt;</span>
            <span class="hljs-tag">&lt;/<span class="hljs-name">goals</span>&gt;</span>
        <span class="hljs-tag">&lt;/<span class="hljs-name">execution</span>&gt;</span>
    <span class="hljs-tag">&lt;/<span class="hljs-name">executions</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">plugin</span>&gt;</span>
</code></pre>
<p>Once your <code>pom.xml</code> is properly set up, you can compile the JAR with dependencies using this command:</p>
<pre><code class="hljs css language-bash">$ mvn assembly:assembly
</code></pre>
<p>By default, this will add a JAR in your project's <code>target</code> folder with the name <code>PROJECT-NAME-VERSION-jar-with-dependencies.jar</code>. Here's an example topology submission command using a compiled JAR:</p>
<pre><code class="hljs css language-bash">$ mvn assembly:assembly
$ heron submit <span class="hljs-built_in">local</span> \
  target/my-project-1.2.3-jar-with-dependencies.jar \
  com.example.Main \
  MyTopology arg1 arg2
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="streamlet-api-topology-configuration"></a><a href="#streamlet-api-topology-configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Streamlet API topology configuration</h2>
<p>Every Streamlet API topology needs to be configured using a <code>Config</code> object. Here's an example default configuration:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.<span class="hljs-type">Config</span>
<span class="hljs-keyword">import</span> org.apache.heron.streamlet.scala.<span class="hljs-type">Runner</span>

<span class="hljs-keyword">val</span> topologyConfig = <span class="hljs-type">Config</span>.defaultConfig()

<span class="hljs-comment">// Apply topology configuration using the topologyConfig object</span>
<span class="hljs-keyword">val</span> topologyRunner = <span class="hljs-keyword">new</span> <span class="hljs-type">Runner</span>()
topologyRunner.run(<span class="hljs-string">"name-for-topology"</span>, topologyConfig, topologyBuilder)
</code></pre>
<p>The table below shows the configurable parameters for Heron topologies:</p>
<table>
<thead>
<tr><th style="text-align:left">Parameter</th><th style="text-align:left">Default</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="#delivery-semantics">Delivery semantics</a></td><td style="text-align:left">At most once</td></tr>
<tr><td style="text-align:left">Serializer</td><td style="text-align:left"><a href="https://github.com/EsotericSoftware/kryo">Kryo</a></td></tr>
<tr><td style="text-align:left">Number of total container topologies</td><td style="text-align:left">2</td></tr>
<tr><td style="text-align:left">Per-container CPU</td><td style="text-align:left">1.0</td></tr>
<tr><td style="text-align:left">Per-container RAM</td><td style="text-align:left">100 MB</td></tr>
</tbody>
</table>
<p>Here's an example non-default configuration:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> topologyConfig = <span class="hljs-type">Config</span>.newBuilder()
        .setNumContainers(<span class="hljs-number">5</span>)
        .setPerContainerRamInGigabytes(<span class="hljs-number">10</span>)
        .setPerContainerCpu(<span class="hljs-number">3.5</span>f)
        .setDeliverySemantics(<span class="hljs-type">Config</span>.<span class="hljs-type">DeliverySemantics</span>.<span class="hljs-type">EFFECTIVELY_ONCE</span>)
        .setSerializer(<span class="hljs-type">Config</span>.<span class="hljs-type">Serializer</span>.<span class="hljs-type">JAVA</span>)
        .setUserConfig(<span class="hljs-string">"some-key"</span>, <span class="hljs-string">"some-value"</span>)
        .build()
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="delivery-semantics"></a><a href="#delivery-semantics" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Delivery semantics</h3>
<p>You can apply <a href="../../../concepts/delivery-semantics">delivery semantics</a> to a Streamlet API topology like this:</p>
<pre><code class="hljs css language-scala">topologyConfig
        .setDeliverySemantics(<span class="hljs-type">Config</span>.<span class="hljs-type">DeliverySemantics</span>.<span class="hljs-type">EFFECTIVELY_ONCE</span>)
</code></pre>
<p>The other available options in the <code>DeliverySemantics</code> enum are <code>ATMOST_ONCE</code> and <code>ATLEAST_ONCE</code>.</p>
<h2><a class="anchor" aria-hidden="true" id="streamlets"></a><a href="#streamlets" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Streamlets</h2>
<p>In the Heron Streamlet API for Scala, processing graphs consist of <a href="../../../concepts/topologies#streamlets">streamlets</a>. One or more supplier streamlets inject data into your graph to be processed by downstream operators.</p>
<h2><a class="anchor" aria-hidden="true" id="operations"></a><a href="#operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Operations</h2>
<table>
<thead>
<tr><th style="text-align:left">Operation</th><th style="text-align:left">Description</th><th style="text-align:left">Example</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="#map-operations"><code>map</code></a></td><td style="text-align:left">Create a new streamlet by applying the supplied mapping function to each element in the original streamlet</td><td style="text-align:left">Add 1 to each element in a streamlet of integers</td></tr>
<tr><td style="text-align:left"><a href="#flatmap-operations"><code>flatMap</code></a></td><td style="text-align:left">Like a map operation but with the important difference that each element of the streamlet is flattened</td><td style="text-align:left">Flatten a sentence into individual words</td></tr>
<tr><td style="text-align:left"><a href="#filter-operations"><code>filter</code></a></td><td style="text-align:left">Create a new streamlet containing only the elements that satisfy the supplied filtering function</td><td style="text-align:left">Remove all inappropriate words from a streamlet of strings</td></tr>
<tr><td style="text-align:left"><a href="#union-operations"><code>union</code></a></td><td style="text-align:left">Unifies two streamlets into one, without modifying the elements of the two streamlets</td><td style="text-align:left">Unite two different <code>Streamlet&lt;String&gt;</code>s into a single streamlet</td></tr>
<tr><td style="text-align:left"><a href="#clone-operations"><code>clone</code></a></td><td style="text-align:left">Creates any number of identical copies of a streamlet</td><td style="text-align:left">Create three separate streamlets from the same source</td></tr>
<tr><td style="text-align:left"><a href="#transform-operations"><code>transform</code></a></td><td style="text-align:left">Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations)</td></tr>
<tr><td style="text-align:left"><a href="#join-operations"><code>join</code></a></td><td style="text-align:left">Create a new streamlet by combining two separate key-value streamlets into one on the basis of each element's key. Supported Join Types: Inner (as default), Outer-Left, Outer-Right and Outer</td><td style="text-align:left">Combine key-value pairs listing current scores (e.g. <code>(&quot;h4x0r&quot;, 127)</code>) for each user into a single per-user stream</td></tr>
<tr><td style="text-align:left"><a href="#key-by-operations"><code>keyBy</code></a></td><td style="text-align:left">Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet</td></tr>
<tr><td style="text-align:left"><a href="#reduce-by-key-operations"><code>reduceByKey</code></a></td><td style="text-align:left">Produces a streamlet of key-value on each key, and in accordance with a reduce function that you apply to all the accumulated values</td><td style="text-align:left">Count the number of times a value has been encountered</td></tr>
<tr><td style="text-align:left"><a href="#reduce-by-key-and-window-operations"><code>reduceByKeyAndWindow</code></a></td><td style="text-align:left">Produces a streamlet of key-value on each key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values</td><td style="text-align:left">Count the number of times a value has been encountered within a specified time window</td></tr>
<tr><td style="text-align:left"><a href="#count-by-key-operations"><code>countByKey</code></a></td><td style="text-align:left">A special reduce operation of counting number of tuples on each key</td><td style="text-align:left">Count the number of times a value has been encountered</td></tr>
<tr><td style="text-align:left"><a href="#count-by-key-and-window-operations"><code>countByKeyAndWindow</code></a></td><td style="text-align:left">A special reduce operation of counting number of tuples on each key, within a time window</td><td style="text-align:left">Count the number of times a value has been encountered within a specified time window</td></tr>
<tr><td style="text-align:left"><a href="#split-operations"><code>split</code></a></td><td style="text-align:left">Split a streamlet into multiple streamlets with different id</td></tr>
<tr><td style="text-align:left"><a href="#with-stream-operations"><code>withStream</code></a></td><td style="text-align:left">Select a stream with id from a streamlet that contains multiple streams</td></tr>
<tr><td style="text-align:left"><a href="#apply-operator-operations"><code>applyOperator</code></a></td><td style="text-align:left">Returns a new streamlet by applying an user defined operator to the original streamlet</td><td style="text-align:left">Apply an existing bolt as an operator</td></tr>
<tr><td style="text-align:left"><a href="#repartition-operations"><code>repartition</code></a></td><td style="text-align:left">Create a new streamlet by applying a new parallelism level to the original streamlet</td><td style="text-align:left">Increase the parallelism of a streamlet from 5 to 10</td></tr>
<tr><td style="text-align:left"><a href="#sink-operations"><code>toSink</code></a></td><td style="text-align:left">Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc.</td><td style="text-align:left">Store processing graph results in an AWS Redshift table</td></tr>
<tr><td style="text-align:left"><a href="#log-operations"><code>log</code></a></td><td style="text-align:left">Logs the final results of a processing graph to stdout. This <em>must</em> be the last step in the graph.</td></tr>
<tr><td style="text-align:left"><a href="#consume-operations"><code>consume</code></a></td><td style="text-align:left">Consume operations are like sink operations except they don't require implementing a full sink interface (consume operations are thus suited for simple operations like logging)</td><td style="text-align:left">Log processing graph results using a custom formatting function</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="map-operations"></a><a href="#map-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Map operations</h3>
<p>Map operations create a new streamlet by applying the supplied mapping function to each element in the original streamlet. Here's an example:</p>
<pre><code class="hljs css language-scala">builder.newSource(() =&gt; <span class="hljs-number">1</span>)
    .map[<span class="hljs-type">Int</span>]((i: <span class="hljs-type">Int</span>) =&gt; i + <span class="hljs-number">12</span>) <span class="hljs-comment">// or .map[Int](_.+(12)) as synthetic function</span>
</code></pre>
<p>In this example, a supplier streamlet emits an indefinite series of 1s. The <code>map</code> operation then adds 12 to each incoming element, producing a streamlet of 13s.</p>
<h3><a class="anchor" aria-hidden="true" id="flatmap-operations"></a><a href="#flatmap-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>FlatMap operations</h3>
<p>FlatMap operations are like <code>map</code> operations but with the important difference that each element of the streamlet is &quot;flattened&quot; into a collection type. In this example, a supplier streamlet emits the same sentence over and over again; the <code>flatMap</code> operation transforms each sentence into a Scala <code>List</code> of individual words:</p>
<pre><code class="hljs css language-scala">builder.newSource(() =&gt; <span class="hljs-string">"I have nothing to declare but my genius"</span>)
    .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>))
</code></pre>
<p>The effect of this operation is to transform the <code>Streamlet[String]</code> into a <code>Streamlet[List[String]]</code>.</p>
<blockquote>
<p>One of the core differences between <code>map</code> and <code>flatMap</code> operations is that <code>flatMap</code> operations typically transform non-collection types into collection types.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="filter-operations"></a><a href="#filter-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Filter operations</h3>
<p>Filter operations retain elements in a streamlet, while potentially excluding some or all elements, on the basis of a provided filtering function. Here's an example:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> java.util.concurrent.<span class="hljs-type">ThreadLocalRandom</span>

builder.newSource(() =&gt; <span class="hljs-type">ThreadLocalRandom</span>.current().nextInt(<span class="hljs-number">1</span>, <span class="hljs-number">11</span>))
        .filter(_.&lt;(<span class="hljs-number">7</span>))
</code></pre>
<p>In this example, a source streamlet consisting of random integers between 1 and 10 is modified by a filter operation that removes all streamlet elements that are lower than 7.</p>
<h3><a class="anchor" aria-hidden="true" id="union-operations"></a><a href="#union-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Union operations</h3>
<p>Union operations combine two streamlets of the same type into a single streamlet without modifying the elements. Here's an example:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> flowers = builder.newSource(() =&gt; <span class="hljs-string">"flower"</span>)
<span class="hljs-keyword">val</span> butterflies = builder.newSource(() =&gt; <span class="hljs-string">"butterfly"</span>)

<span class="hljs-keyword">val</span> combinedSpringStreamlet = flowers.union(butterflies)
</code></pre>
<p>Here, one streamlet is an endless series of &quot;flowers&quot; while the other is an endless series of &quot;butterflies&quot;. The <code>union</code> operation combines them into a single streamlet of alternating &quot;flowers&quot; and &quot;butterflies&quot;.</p>
<h3><a class="anchor" aria-hidden="true" id="clone-operations"></a><a href="#clone-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Clone operations</h3>
<p>Clone operations enable you to create any number of &quot;copies&quot; of a streamlet. Each of the &quot;copy&quot; streamlets contains all the elements of the original and can be manipulated just like the original streamlet. Here's an example:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> scala.util.<span class="hljs-type">Random</span>

<span class="hljs-keyword">val</span> integers = builder.newSource(() =&gt; <span class="hljs-type">Random</span>.nextInt(<span class="hljs-number">100</span>))

<span class="hljs-keyword">val</span> copies = integers.clone(<span class="hljs-number">5</span>)
<span class="hljs-keyword">val</span> ints1 = copies.get(<span class="hljs-number">0</span>)
<span class="hljs-keyword">val</span> ints2 = copies.get(<span class="hljs-number">1</span>)
<span class="hljs-keyword">val</span> ints3 = copies.get(<span class="hljs-number">2</span>)
<span class="hljs-comment">// and so on...</span>
</code></pre>
<p>In this example, a streamlet of random integers between 0 and 99 is split into 5 identical streamlets.</p>
<h3><a class="anchor" aria-hidden="true" id="transform-operations"></a><a href="#transform-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Transform operations</h3>
<p>Transform operations are highly flexible operations that are most useful for:</p>
<ul>
<li>operations involving state in <a href="../../concepts/delivery-semantics#stateful-topologies">stateful topologies</a></li>
<li>operations that don't neatly fit into the other categories or into a lambda-based logic</li>
</ul>
<p>Transform operations require you to implement three different methods:</p>
<ul>
<li>A <code>setup</code> function that enables you to pass a context object to the operation and to specify what happens prior to the <code>transform</code> step</li>
<li>A <code>transform</code> operation that performs the desired transformation</li>
<li>A <code>cleanup</code> function that allows you to specify what happens after the <code>transform</code> step</li>
</ul>
<p>The context object available to a transform operation provides access to:</p>
<ul>
<li>the current state of the topology</li>
<li>the topology's configuration</li>
<li>the name of the stream</li>
<li>the stream partition</li>
<li>the current task ID</li>
</ul>
<p>Here's a Scala example of a transform operation in a topology where a stateful record is kept of the number of items processed:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.<span class="hljs-type">Context</span>
<span class="hljs-keyword">import</span> org.apache.heron.streamlet.scala.<span class="hljs-type">SerializableTransformer</span>

<span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">CountNumberOfItems</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">SerializableTransformer</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>] </span>{
    <span class="hljs-keyword">private</span> <span class="hljs-keyword">val</span> numberOfItems = <span class="hljs-keyword">new</span> <span class="hljs-type">AtomicLong</span>()

    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">setup</span></span>(context: <span class="hljs-type">Context</span>): <span class="hljs-type">Unit</span> = {
      numberOfItems.incrementAndGet()
      context.getState().put(<span class="hljs-string">"number-of-items"</span>, numberOfItems)
    }

    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">transform</span></span>(i: <span class="hljs-type">String</span>, f: <span class="hljs-type">String</span> =&gt; <span class="hljs-type">Unit</span>): <span class="hljs-type">Unit</span> = {
      <span class="hljs-keyword">val</span> transformedString = i.toUpperCase
      f(transformedString)
    }

    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">cleanup</span></span>(): <span class="hljs-type">Unit</span> =
      println(<span class="hljs-string">s"Successfully processed new state: <span class="hljs-subst">$numberOfItems</span>"</span>)
  }
</code></pre>
<p>This operation does a few things:</p>
<ul>
<li>In the <code>setup</code> method, the <a href="/api/java/org/apache/heron/streamlet/Context.html"><code>Context</code></a> object is used to access the current state (which has the semantics of a Java <code>Map</code>). The current number of items processed is incremented by one and then saved as the new state.</li>
<li>In the <code>transform</code> method, the incoming string is transformed as UpperCase in some way and then &quot;accepted&quot; as the new value.</li>
<li>In the <code>cleanup</code> step, the current count of items processed is logged.</li>
</ul>
<p>Here's that operation within the context of a streamlet processing graph:</p>
<pre><code class="hljs css language-scala">builder.newSource(() =&gt; <span class="hljs-string">"Some string over and over"</span>);
        .transform(<span class="hljs-keyword">new</span> <span class="hljs-type">CountNumberOfItems</span>())
        .log()
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="join-operations"></a><a href="#join-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Join operations</h3>
<blockquote>
<p>For a more in-depth conceptual discussion of joins, see the <a href="../../../concepts/streamlet-api#join-operations">Heron Streamlet API</a> doc.</p>
</blockquote>
<p>Join operations unify two streamlets <em>on a key</em> (join operations thus require KV streamlets). Each <code>KeyValue</code> object in a streamlet has, by definition, a key. When a <code>join</code> operation is added to a processing graph,</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.{<span class="hljs-type">Config</span>, <span class="hljs-type">KeyValue</span>, <span class="hljs-type">WindowConfig</span>}
<span class="hljs-keyword">import</span> org.apache.heron.streamlet.scala.<span class="hljs-type">Builder</span>

<span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder()

<span class="hljs-keyword">val</span> streamlet1 = builder
  .newSource(() =&gt;
    <span class="hljs-keyword">new</span> <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>](<span class="hljs-string">"heron-api"</span>, <span class="hljs-string">"topology-api"</span>))
  .setName(<span class="hljs-string">"streamlet1"</span>)

<span class="hljs-keyword">val</span> streamlet2 = builder
  .newSource(() =&gt;
    <span class="hljs-keyword">new</span> <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>](<span class="hljs-string">"heron-api"</span>, <span class="hljs-string">"streamlet-api"</span>))
  .setName(<span class="hljs-string">"streamlet2"</span>)

streamlet1.join[<span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>], <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>], <span class="hljs-type">String</span>](
  streamlet2,
  (kv: <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>]) =&gt; kv,
  (kv: <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>]) =&gt; kv,
  <span class="hljs-type">WindowConfig</span>.<span class="hljs-type">TumblingCountWindow</span>(<span class="hljs-number">10</span>),
  (kv1: <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>], kv2: <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>]) =&gt;
    kv1.getValue + <span class="hljs-string">" - "</span> + kv2.getValue
)
</code></pre>
<p>In this case, the resulting streamlet would consist of an indefinite stream with two <code>KeyValue</code> objects with the key <code>heron-api</code> but different values (<code>topology-api</code> and <code>streamlet-api</code>).</p>
<blockquote>
<p>The effect of a <code>join</code> operation is to create a new streamlet <em>for each key</em>.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="key-by-operations"></a><a href="#key-by-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Key by operations</h3>
<p>Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet. Here is an example:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder()

builder
  .newSource(() =&gt; <span class="hljs-string">"Paco de Lucia is one of the most popular virtuoso"</span>)
  <span class="hljs-comment">// Convert each sentence into individual words</span>
  .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>))
  .keyBy[<span class="hljs-type">String</span>, <span class="hljs-type">Int</span>](
      <span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span>
      (word: <span class="hljs-type">String</span>) =&gt; word,
      <span class="hljs-comment">// Value extractor (get the length of each word)</span>
      (word: <span class="hljs-type">String</span>) =&gt; word.length
  )
  <span class="hljs-comment">// The result is logged</span>
  .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="reduce-by-key-operations"></a><a href="#reduce-by-key-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reduce by key operations</h3>
<p>You can apply <a href="https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html">reduce</a> operations to streamlets by specifying:</p>
<ul>
<li>a key extractor that determines what counts as the key for the streamlet</li>
<li>a value extractor that determines which final value is chosen for each element of the streamlet</li>
<li>a reduce function that produces a single value for each key in the streamlet</li>
</ul>
<p>Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value). Here's an example:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder()

builder
  .newSource(() =&gt; <span class="hljs-string">"Paco de Lucia is one of the most popular virtuoso"</span>)
  <span class="hljs-comment">// Convert each sentence into individual words</span>
  .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>))
  .reduceByKey[<span class="hljs-type">String</span>, <span class="hljs-type">Int</span>](
      <span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span>
      (word: <span class="hljs-type">String</span>) =&gt; word,
      <span class="hljs-comment">// Value extractor (each word appears only once, hence the value is always 1)</span>
      (word: <span class="hljs-type">String</span>) =&gt; <span class="hljs-number">1</span>,
      <span class="hljs-comment">// Reduce operation (a running sum)</span>
      (x: <span class="hljs-type">Int</span>, y: <span class="hljs-type">Int</span>) =&gt; x + y)
  <span class="hljs-comment">// The result is logged</span>
  .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="reduce-by-key-and-window-operations"></a><a href="#reduce-by-key-and-window-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reduce by key and window operations</h3>
<p>You can apply <a href="https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html">reduce</a> operations to streamlets by specifying:</p>
<ul>
<li>a key extractor that determines what counts as the key for the streamlet</li>
<li>a value extractor that determines which final value is chosen for each element of the streamlet</li>
<li>a <a href="../../../concepts/topologies#window-operations">time window</a> across which the operation will take place</li>
<li>a reduce function that produces a single value for each key in the streamlet</li>
</ul>
<p>Reduce by key and window operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value, as well as information about the window in which the operation took place). Here's an example:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.<span class="hljs-type">WindowConfig</span>;

<span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder()

builder
  .newSource(() =&gt; <span class="hljs-string">"Paco de Lucia is one of the most popular virtuoso"</span>)
  <span class="hljs-comment">// Convert each sentence into individual words</span>
  .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>))
  .reduceByKeyAndWindow[<span class="hljs-type">String</span>, <span class="hljs-type">Int</span>](
      <span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span>
      (word: <span class="hljs-type">String</span>) =&gt; word,
      <span class="hljs-comment">// Value extractor (each word appears only once, hence the value is always 1)</span>
      (word: <span class="hljs-type">String</span>) =&gt; <span class="hljs-number">1</span>,
      <span class="hljs-comment">// Window configuration</span>
      <span class="hljs-type">WindowConfig</span>.<span class="hljs-type">TumblingCountWindow</span>(<span class="hljs-number">50</span>),
      <span class="hljs-comment">// Reduce operation (a running sum)</span>
      (x: <span class="hljs-type">Int</span>, y: <span class="hljs-type">Int</span>) =&gt; x + y)
  <span class="hljs-comment">// The result is logged</span>
  .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="count-by-key-operations"></a><a href="#count-by-key-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Count by key operations</h3>
<p>Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered. Here's an example:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder()

builder
  .newSource(() =&gt; <span class="hljs-string">"Paco de Lucia is one of the most popular virtuoso"</span>)
  <span class="hljs-comment">// Convert each sentence into individual words</span>
  .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>))
  <span class="hljs-comment">// Count the number of occurrences of each word</span>
  .countByKey[<span class="hljs-type">String</span>]((word: <span class="hljs-type">String</span>) =&gt; word)
  <span class="hljs-comment">// The result is logged</span>
  .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="count-by-key-and-window-operations"></a><a href="#count-by-key-and-window-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Count by key and window operations</h3>
<p>Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each <a href="../../../concepts/topologies#window-operations">time window</a>. Here's an example:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder()

builder
  .newSource(() =&gt; <span class="hljs-string">"Paco de Lucia is one of the most popular virtuoso"</span>)
  <span class="hljs-comment">// Convert each sentence into individual words</span>
  .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>))
  <span class="hljs-comment">// Count the number of occurrences of each word within each time window</span>
  .countByKeyAndWindow[<span class="hljs-type">String</span>](
      (word: <span class="hljs-type">String</span>) =&gt; word,
      <span class="hljs-type">WindowConfig</span>.<span class="hljs-type">TumblingCountWindow</span>(<span class="hljs-number">50</span>))
  <span class="hljs-comment">// The result is logged</span>
  .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="split-operations"></a><a href="#split-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Split operations</h3>
<p>Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet. Here is an example:</p>
<pre><code class="hljs css language-scala">val builder = Builder.newBuilder()

builder
  .newSource(() =&gt; "Paco de Lucia is one of the most popular virtuoso")
  // Convert each sentence into individual words
  .flatMap[String](_.split(" "))
  // Count the number of occurrences of each word within each time window
  .split(Map(
      "long_word" -&gt; { word: String =&gt; word.length &gt;= 4 },
      "short_word" -&gt; { word: String =&gt; word.length &lt; 4 }
  ))
  .withStream("short_word)
  // The result is logged
  .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="with-stream-operations"></a><a href="#with-stream-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>With stream operations</h3>
<p>With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with <a href="#split-operations">split</a>.</p>
<h3><a class="anchor" aria-hidden="true" id="apply-operator-operations"></a><a href="#apply-operator-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Apply operator operations</h3>
<p>Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet. Here is an example:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder()

<span class="hljs-keyword">private</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">MyBoltOperator</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">MyBolt</span></span>
    <span class="hljs-keyword">with</span> <span class="hljs-type">IStreamletOperator</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>] {
}

builder
  .newSource(() =&gt; <span class="hljs-string">"Paco de Lucia is one of the most popular virtuoso"</span>)
  <span class="hljs-comment">// Convert each sentence into individual words</span>
  .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>))
  <span class="hljs-comment">// Apply user defined operation</span>
  .applyOperator(<span class="hljs-keyword">new</span> <span class="hljs-type">MyBoltOperator</span>())
  <span class="hljs-comment">// The result is logged</span>
  .log();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="repartition-operations"></a><a href="#repartition-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Repartition operations</h3>
<p>When you assign a number of <a href="#partitioning-and-parallelism">partitions</a> to a processing step, each step that comes after it inherits that number of partitions. Thus, if you assign 5 partitions to a <code>map</code> operation, then any <code>mapToKV</code>, <code>flatMap</code>, <code>filter</code>, etc. operations that come after it will also be assigned 5 partitions. But you can also change the number of partitions for a processing step (as well as the number of partitions for downstream operations) using <code>repartition</code>. Here's an example:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> java.util.concurrent.<span class="hljs-type">ThreadLocalRandom</span>;

<span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder

<span class="hljs-keyword">val</span> numbers = builder
  .newSource(() =&gt; <span class="hljs-type">ThreadLocalRandom</span>.current().nextInt(<span class="hljs-number">1</span>, <span class="hljs-number">11</span>))

numbers
  .setNumPartitions(<span class="hljs-number">5</span>)
  .map(i =&gt; i + <span class="hljs-number">1</span>)
  .repartition(<span class="hljs-number">2</span>)
  .filter(i =&gt; i &gt; <span class="hljs-number">7</span> &amp;&amp; i &lt; <span class="hljs-number">2</span>)
  .log()
</code></pre>
<p>In this example, the supplier streamlet emits random integers between 1 and 10. That operation is assigned 5 partitions. After the <code>map</code> operation, the <code>repartition</code> function is used to assign 2 partitions to all downstream operations.</p>
<h3><a class="anchor" aria-hidden="true" id="sink-operations"></a><a href="#sink-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Sink operations</h3>
<p>In processing graphs like the ones you build using the Heron Streamlet API, <strong>sinks</strong> are essentially the terminal points in your graph, where your processing logic comes to an end. A processing graph can end with writing to a database, publishing to a topic in a pub-sub messaging system, and so on. With the Streamlet API, you can implement your own custom sinks. Here's an example:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.<span class="hljs-type">Context</span>
<span class="hljs-keyword">import</span> org.apache.heron.streamlet.scala.<span class="hljs-type">Sink</span>

<span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">FormattedLogSink</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">Sink</span>[<span class="hljs-type">String</span>] </span>{
    <span class="hljs-keyword">private</span> <span class="hljs-keyword">var</span> streamName: <span class="hljs-type">Option</span>[<span class="hljs-type">String</span>] = <span class="hljs-type">None</span>

    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">setup</span></span>(context: <span class="hljs-type">Context</span>): <span class="hljs-type">Unit</span> =
      streamName = <span class="hljs-type">Some</span>(context.getStreamName)

    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">put</span></span>(tuple: <span class="hljs-type">String</span>): <span class="hljs-type">Unit</span> =
      println(<span class="hljs-string">s"The current value of tuple is <span class="hljs-subst">$tuple</span> in stream: <span class="hljs-subst">$streamName</span>"</span>)

    <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">cleanup</span></span>(): <span class="hljs-type">Unit</span> = {}
  }
</code></pre>
<p>In this example, the sink fetches the name of the enclosing streamlet from the context passed in the <code>setup</code> method. The <code>put</code> method specifies how the sink handles each element that is received (in this case, a formatted message is logged to stdout). The <code>cleanup</code> method enables you to specify what happens after the element has been processed by the sink.</p>
<p>Here is the <code>FormattedLogSink</code> at work in an example processing graph:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder

builder.newSource(() =&gt; <span class="hljs-string">"Here is a string to be passed to the sink"</span>)
        .toSink(<span class="hljs-keyword">new</span> <span class="hljs-type">FormattedLogSink</span>)
</code></pre>
<blockquote>
<p><a href="#log-operations">Log operations</a> rely on a log sink that is provided out of the box. You'll need to implement other sinks yourself.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="log-operations"></a><a href="#log-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Log operations</h3>
<p>Log operations are special cases of consume operations that log streamlet elements to stdout.</p>
<blockquote>
<p>Streamlet elements will be using their <code>toString</code> representations and at the <code>INFO</code> level.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="consume-operations"></a><a href="#consume-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consume operations</h3>
<p>Consume operations are like <a href="#sink-operations">sink operations</a> except they don't require implementing a full sink interface. Consume operations are thus suited for simple operations like formatted logging. Here's an example:</p>
<pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder
      .newSource(() =&gt; <span class="hljs-type">Random</span>.nextInt(<span class="hljs-number">10</span>))
      .filter(i =&gt; i % <span class="hljs-number">2</span> == <span class="hljs-number">0</span>)
      .consume(i =&gt; println(<span class="hljs-string">s"Even number found: <span class="hljs-subst">$i</span>"</span>))
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/topology-development-topology-api-python"><span class="arrow-prev">← </span><span>The Heron Topology API for Python</span></a><a class="docs-next button" href="/docs/client-api-docs-overview"><span>Client API Docs</span><span class="arrow-next"> →</span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#getting-started">Getting started</a><ul class="toc-headings"><li><a href="#maven-setup">Maven setup</a></li></ul></li><li><a href="#streamlet-api-topology-configuration">Streamlet API topology configuration</a><ul class="toc-headings"><li><a href="#delivery-semantics">Delivery semantics</a></li></ul></li><li><a href="#streamlets">Streamlets</a></li><li><a href="#operations">Operations</a><ul class="toc-headings"><li><a href="#map-operations">Map operations</a></li><li><a href="#flatmap-operations">FlatMap operations</a></li><li><a href="#filter-operations">Filter operations</a></li><li><a href="#union-operations">Union operations</a></li><li><a href="#clone-operations">Clone operations</a></li><li><a href="#transform-operations">Transform operations</a></li><li><a href="#join-operations">Join operations</a></li><li><a href="#key-by-operations">Key by operations</a></li><li><a href="#reduce-by-key-operations">Reduce by key operations</a></li><li><a href="#reduce-by-key-and-window-operations">Reduce by key and window operations</a></li><li><a href="#count-by-key-operations">Count by key operations</a></li><li><a href="#count-by-key-and-window-operations">Count by key and window operations</a></li><li><a href="#split-operations">Split operations</a></li><li><a href="#with-stream-operations">With stream operations</a></li><li><a href="#apply-operator-operations">Apply operator operations</a></li><li><a href="#repartition-operations">Repartition operations</a></li><li><a href="#sink-operations">Sink operations</a></li><li><a href="#log-operations">Log operations</a></li><li><a href="#consume-operations">Consume operations</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><div class="apache-disclaimer">Apache Heron is an effort undergoing incubation at <a target="_blank" href="https://apache.org/">The Apache Software Foundation (ASF)</a> sponsored by the Apache Incubator PMC. Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.<br/><br/>Apache®, the names of Apache projects, and the feather logo are either <a rel="external" href="https://www.apache.org/foundation/marks/list/">registered trademarks or trademarks</a> of the Apache Software Foundation in the United States and/or other countries.<br/><br/><div class="copyright-box">Copyright © 2023 the Apache Software Foundation, Apache Heron, Heron, 
  Apache, the Apache feather Logo, and the Apache Heron project logo are either registered 
  trademarks or trademarks of the Apache Software Foundation.</div></div><div class="apache-links"><a class="item" rel="external" href="https://incubator.apache.org/">Apache Incubator</a><div><a class="item" rel="external" href="https://www.apache.org/">About the ASF</a></div><div><a class="item" rel="external" href="https://www.apache.org/events/current-event">Events</a></div><div><a class="item" rel="external" href="https://www.apache.org/foundation/thanks.html">Thanks</a></div><div><a class="item" rel="external" href="https://www.apache.org/foundation/sponsorship.html">Become a Sponsor</a></div><div><a class="item" rel="external" href="https://www.apache.org/security/">Security</a></div><div><a class="item" rel="external" href="https://www.apache.org/licenses/">License</a></div></div></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>